较真儿学源码系列-PowerJob MapReduce源码分析

news2025/1/18 6:58:06

        PowerJob版本:4.3.2-main。

        之前分析过PowerJob的时间轮源码,感兴趣的可以查看《较真儿学源码系列-PowerJob时间轮源码分析》


1 简介

        MapReduce是一种编程模型,以及在集群上使用并行、分布式算法处理和生成大数据集的相关实现。
        一个MapReduce程序由一个map过程和reduce方法组成,map过程执行过滤和排序(例如按名字将学生分成不同的队列,每个名字一个队列),reduce方法执行聚合操作(例如计算每个队列中的学生人数,得出姓名的频率)。“MapReduce系统”(也称为“基础设施”或“框架”)通过调度分布式服务器、并行运行各种任务、管理系统中各部分之间的所有通信和数据传输以及提供冗余和容错机制来协调处理过程。
        该模型是数据分拆-应用-合并策略的特化版,用于数据分析;其灵感来自函数式编程中常用的map和reduce函数,但在MapReduce框架中,它们的目的与其原始形式并不相同。MapReduce框架的关键贡献并不在于实际的map和reduce函数(这些函数类似于1995年消息传递接口标准中的reduce和scatter操作),而在于通过并行化实现的各种应用程序的可扩展性和容错性。因此,单线程的MapReduce实现通常不会比传统的(非MapReduce)实现更快;任何性能提升通常只能在多线程的多处理器硬件上实现。只有当优化分布式shuffle操作(这降低了网络通信成本)和MapReduce框架的容错功能发挥作用时,使用这种模型才有益。优化通信成本是设计一个好的MapReduce算法的必要条件。
        MapReduce库已经在许多编程语言中实现,并具有不同程度的优化。Apache Hadoop是一个流行的开源实现,支持分布式shuffle操作。MapReduce最初指的是Google的专有技术,但后来被通用化。到2014年,Google不再使用MapReduce作为其大数据处理的主要模型,而Apache Mahout的开发则转向更强大且更少依赖磁盘的机制,这些机制包含了完整的map和reduce功能。

        以上是MapReduce的解释,而MapReduce框架(或系统)通常由三个操作(或步骤)组成:

  1. Map:每个工作节点将map函数应用于本地数据,并将输出写入临时存储器。master节点确保仅处理一份冗余输入数据的副本。
  2. Shuffle:工作节点基于输出键(由map函数产生)重新分配数据,使得属于一个键的所有数据都位于同一个工作节点上。
  3. Reduce:现在,工作节点并行处理每个键的输出数据组。

        而在PowerJob中也提供了类似MapReduce的功能,在面对大数据量的离线任务时会很合适。


2 使用

        我们使用PowerJob的MapReduce功能,需要实现MapReduceProcessor接口。

        可以看到,MapReduceProcessor接口继承了MapProcessor接口,而MapProcessor接口又继承了BasicProcessor接口。MapProcessor中含有默认的map方法,调用该方法即可完成分发子任务的过程。而实现MapReduceProcessor接口的类中需要同时实现reduce方法,完成最后的汇总过程。具体的使用如下所示:

/**
 * MapReduce 处理器示例
 * 控制台参数:{"batchSize": 100, "batchNum": 2}
 *
 * @author tjq
 * @since 2020/4/17
 */
@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {

    @Override
    public ProcessResult process(TaskContext context) throws Exception {

        OmsLogger omsLogger = context.getOmsLogger();

        log.info("============== TestMapReduceProcessor#process ==============");
        log.info("isRootTask:{}", isRootTask());
        log.info("taskContext:{}", JsonUtils.toJSONString(context));

        // 根据控制台参数获取MR批次及子任务大小
        final JSONObject jobParams = JSONObject.parseObject(context.getJobParams());

        Integer batchSize = (Integer) jobParams.getOrDefault("batchSize", 100);
        Integer batchNum = (Integer) jobParams.getOrDefault("batchNum", 10);

        if (isRootTask()) {
            log.info("==== MAP ====");
            omsLogger.info("[DemoMRProcessor] start root task~");
            List<TestSubTask> subTasks = Lists.newLinkedList();
            for (int j = 0; j < batchNum; j++) {
                for (int i = 0; i < batchSize; i++) {
                    int x = j * batchSize + i;
                    subTasks.add(new TestSubTask("name" + x, x));
                }
                map(subTasks, "MAP_TEST_TASK");
                subTasks.clear();
            }
            omsLogger.info("[DemoMRProcessor] map success~");
            return new ProcessResult(true, "MAP_SUCCESS");
        } else {
            log.info("==== NORMAL_PROCESS ====");
            omsLogger.info("[DemoMRProcessor] process subTask: {}.", JSON.toJSONString(context.getSubTask()));
            log.info("subTask: {}", JsonUtils.toJSONString(context.getSubTask()));
            Thread.sleep(1000);
            if (context.getCurrentRetryTimes() == 0) {
                return new ProcessResult(false, "FIRST_FAILED");
            } else {
                return new ProcessResult(true, "PROCESS_SUCCESS");
            }
        }
    }

    @Override
    public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
        log.info("================ MapReduceProcessorDemo#reduce ================");
        log.info("TaskContext: {}", JSONObject.toJSONString(context));
        log.info("List<TaskResult>: {}", JSONObject.toJSONString(taskResults));
        context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults);

        boolean success = true;
        return new ProcessResult(success, context + ": " + success);
    }

    @Getter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    public static class TestSubTask {
        private String name;
        private int age;
    }
}

        以上是官方的例子,从中可以看到,根任务和子任务都会执行process方法,其中会使用到isRootTask方法来区分根任务和子任务。根任务会调用到map方法来分发子任务,而子任务才会执行实际的业务逻辑。reduce方法则会汇总所有子任务的执行结果。在控制台创建的任务如下:

        而如果只想使用map过程,而不关心reduce、即子任务汇总上来的执行结果的话,则只需要实现MapProcessor接口就行了。


3 map方法

        根任务会调用到map方法来分发子任务,查看其实现:

/**
 * MapProcessor:
 * 分发子任务
 *
 * @param taskList 子任务,再次执行时可通过 TaskContext#getSubTask 获取
 * @param taskName 子任务名称,即子任务处理器中 TaskContext#getTaskName 获取到的值
 * @throws PowerJobCheckedException map 失败将抛出异常
 */
default void map(List<?> taskList, String taskName) throws PowerJobCheckedException {

    if (CollectionUtils.isEmpty(taskList)) {
        return;
    }

    //从缓存中获取任务
    TaskDO task = ThreadLocalStore.getTask();
    WorkerRuntime workerRuntime = ThreadLocalStore.getRuntimeMeta();

    if (taskList.size() > RECOMMEND_BATCH_SIZE) {
        log.warn("[Map-{}] map task size is too large, network maybe overload... please try to split the tasks.", task.getInstanceId());
    }

    // 修复 map 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
    //如果任务名是特殊名称的话(OMS_ROOT_TASK/OMS_LAST_TASK),需要改写,加上“X-”前缀
    if (TaskConstant.ROOT_TASK_NAME.equals(taskName) || TaskConstant.LAST_TASK_NAME.equals(taskName)) {
        log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically.", task.getInstanceId(), taskName, taskName);
        taskName = "X-" + taskName;
    }

    // 1. 构造请求
    ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);

    // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
    boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime);

    if (requestSucceed) {
        log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size());
    } else {
        throw new PowerJobCheckedException("map failed for task: " + taskName);
    }
}

/**
 * ProcessorMapTaskRequest:
 * 第31行代码处:
 */
public ProcessorMapTaskRequest(TaskDO parentTask, List<?> subTaskList, String taskName) {

    this.instanceId = parentTask.getInstanceId();
    this.subInstanceId = parentTask.getSubInstanceId();
    this.taskName = taskName;
    this.subTasks = Lists.newLinkedList();

    subTaskList.forEach(subTask -> {
        // 同一个 Task 内部可能多次 Map,因此还是要确保线程级别的唯一
        String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.getTaskIDAddr().getAndIncrement();
        // 写入类名,方便反序列化
        subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask)));
    });
}

/**
 * TransportUtils:
 * 第34行代码处:
 */
public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException {
    try {
        return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address, req, workerRuntime.getTransporter()).isSuccess();
    } catch (Throwable throwable) {
        throw new PowerJobCheckedException(throwable);
    }
}

/**
 * TaskTrackerActor:
 * 第68行代码处:
 * 子任务 map 处理器
 */
@Handler(path = WTT_HANDLER_MAP_TASK)
public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) {

    //获取taskTracker
    HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
    if (taskTracker == null) {
        log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req);
        return null;
    }

    boolean success = false;
    List<TaskDO> subTaskList = Lists.newLinkedList();

    try {

        req.getSubTasks().forEach(originSubTask -> {
            TaskDO subTask = new TaskDO();

            //这里需要特别留意下,实际上是用调用map方法时传进来的任务名来生成子任务的
            subTask.setTaskName(req.getTaskName());
            subTask.setSubInstanceId(req.getSubInstanceId());

            subTask.setTaskId(originSubTask.getTaskId());
            subTask.setTaskContent(originSubTask.getTaskContent());

            subTaskList.add(subTask);
        });

        success = taskTracker.submitTask(subTaskList);
    } catch (Exception e) {
        log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e);
    }

    AskResponse response = new AskResponse();
    response.setSuccess(success);
    return response;
}

/**
 * HeavyTaskTracker:
 * 第107行代码处:
 * 提交Task任务(MapReduce的Map,Broadcast的广播),上层保证 batchSize,同时插入过多数据可能导致失败
 *
 * @param newTaskList 新增的子任务列表
 */
public boolean submitTask(List<TaskDO> newTaskList) {
    if (finished.get()) {
        return true;
    }
    if (CollectionUtils.isEmpty(newTaskList)) {
        return true;
    }
    // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!)
    newTaskList.forEach(task -> {
        task.setInstanceId(instanceId);
        //将状态设置为“等待调度器调度”
        task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
        task.setFailedCnt(0);
        task.setLastModifiedTime(System.currentTimeMillis());
        task.setCreatedTime(System.currentTimeMillis());
        task.setLastReportTime(-1L);
    });

    log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList);
    //这里就是在往task_info表中插入数据
    return taskPersistenceService.batchSave(newTaskList);
}

        可以看到,map方法的作用就是根据根任务创建了一堆子任务,这些子任务的名称都是调用map方法时传进来的任务名。最后,子任务全都持久化到task_info表中。那么这些数据是什么时候会被执行的呢?我们先带着这个疑惑。

        之前我们在《较真儿学源码系列-PowerJob启动流程源码分析》的第2.3.1小节中分析过,服务端启动的时候,会执行一些初始化任务。其中在PowerScheduleService.scheduleNormalJob方法中会调用到HeavyTaskTracker.create方法。其中又会开启三个定时任务(StatusCheckRunnable、WorkerDetector和Dispatcher)。之前我们只分析了常规任务的StatusCheckRunnable和Dispatcher方法的执行逻辑,接下来就来分析下在MapReduce任务的实现下有什么不一样的地方:

3.1 StatusCheckRunnable

/**
 * StatusCheckRunnable:
 */
@SuppressWarnings("squid:S3776")
private void innerRun() {

    //获取任务实例产生的各个Task状态,用于分析任务实例执行情况
    InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);

    long finishedNum = holder.succeedNum + holder.failedNum;
    long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;

    log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);

    //组装上报参数
    //...

    boolean success = false;
    String result = null;

    // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果
    if (unfinishedNum == 0) {

        // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
        if (finishedNum == 0) {
            finished.set(true);
            result = SystemInstanceResult.TASK_INIT_FAILED;
        } else {
            ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

            switch (executeType) {

                case STANDALONE:
                    //...
                    break;
                // MAP 不关心结果,最简单
                case MAP:
                    finished.set(true);
                    success = holder.failedNum == 0;
                    result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
                    break;
                // MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
                default:

                    //MapReduce任务下查找是否含有最终任务(代码执行到这里,unfinishedNum == 0 并且finishedNum != 0,说明所有的根任务和子任务都执行完了)
                    Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
                    if (lastTaskOptional.isPresent()) {

                        // 存在则根据 reduce 任务来判断状态
                        TaskDO resultTask = lastTaskOptional.get();
                        TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());

                        if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
                            finished.set(true);
                            success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
                            result = resultTask.getResult();
                        }

                    } else {

                        // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
                        //最后会创建一个最终任务,该任务是来执行reduce方法的
                        TaskDO newLastTask = new TaskDO();
                        //这里需要留意下,最终任务的名称为OMS_LAST_TASK(LAST_TASK_NAME)
                        newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
                        newLastTask.setTaskId(LAST_TASK_ID);
                        newLastTask.setSubInstanceId(instanceId);
                        newLastTask.setAddress(workerRuntime.getWorkerAddress());
                        //task_info表中插入数据
                        submitTask(Lists.newArrayList(newLastTask));
                    }
            }
        }
    }

    //...
}

        之前我们分析过StatusCheckRunnable的innerRun方法,但是没有分析过MapReduce的处理。由上可以看到,因为Map任务不关心处理结果,所以没有什么具体的逻辑,不会上报结果;而MapReduce则不一样,在所有根任务和子任务都执行完了后,会查找有没有最终任务。有则判断其执行状态,没有则会新建(最终任务是用来执行reduce任务的)。

3.2 WorkerDetector

        WorkerDetector是Map/MapReduce任务中检查是否需要注册新的worker节点用的:

/**
 * WorkerDetector:
 */
@Override
public void run() {

    //检查是否需要动态加载新的执行器
    boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker();
    log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker);
    //不需要就直接退出,不做任何处理
    if (!needMoreWorker) {
        return;
    }

    //获取当前服务端地址
    final String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
    if (StringUtils.isEmpty(currentServerAddress)) {
        log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId);
        return;
    }

    try {
        WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId());
        AskResponse response = TransportUtils.reliableQueryJobCluster(req, currentServerAddress, workerRuntime.getTransporter());
        if (!response.isSuccess()) {
            log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());
            return;
        }

        List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {
        });
        ptStatusHolder.register(workerList);
    } catch (Exception e) {
        log.warn("[TaskTracker-{}] detective failed, currentServer: {}", instanceId, currentServerAddress, e);
    }
}

/**
 * ProcessorTrackerStatusHolder:
 * 第8行代码处:
 * 检查是否需要动态加载新的执行器
 *
 * @return check need more workers
 */
public boolean checkNeedMoreWorker() {
    if (endlessWorkerNum()) {
        return true;
    }
    //可用ProcessorTracker的数量是否小于最大worker数量
    return getAvailableProcessorTrackers().size() < maxWorkerCount;
}

/**
 * 第46行代码处:
 */
private boolean endlessWorkerNum() {
    //最大worker数量是否为空
    return maxWorkerCount == null || maxWorkerCount == 0;
}

/**
 * TransportUtils:
 * 第24行代码处:
 */
@SneakyThrows
public static AskResponse reliableQueryJobCluster(WorkerQueryExecutorClusterReq req, String address, Transporter transporter) {
    return reliableAsk(ServerType.SERVER, S4W_PATH, S4W_HANDLER_QUERY_JOB_CLUSTER, address, req, transporter);
}

/**
 * AbWorkerRequestHandler:
 * 第67行代码处:
 */
@Override
@Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING)
public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) {
    AskResponse askResponse;

    Long jobId = req.getJobId();
    Long appId = req.getAppId();

    JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
    //从任务表中获取任务
    Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
    if (jobInfoOpt.isPresent()) {
        JobInfoDO jobInfo = jobInfoOpt.get();
        if (!jobInfo.getAppId().equals(appId)) {
            askResponse = AskResponse.failed("Permission Denied!");
        } else {
            //获取合适的worker列表并返回
            List<String> sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo)
                    .stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
            askResponse = AskResponse.succeed(sortedAvailableWorker);
        }
    } else {
        askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
    }
    return askResponse;
}

/**
 * ProcessorTrackerStatusHolder:
 * 第32行代码处:
 */
public void register(List<String> workerIpList) {
    if (endlessWorkerNum()) {
        //如果当前没有worker,则根据worker列表一一注册
        workerIpList.forEach(this::registerOne);
        return;
    }
    //获取可以派发任务的 ProcessorTracker
    List<String> availableProcessorTrackers = getAvailableProcessorTrackers();
    int currentWorkerSize = availableProcessorTrackers.size();
    int needMoreNum = maxWorkerCount - currentWorkerSize;
    if (needMoreNum <= 0) {
        return;
    }

    log.info("[PTStatusHolder-{}] currentWorkerSize: {}, needMoreNum: {}", instanceId, currentWorkerSize, needMoreNum);

    //在不超过最大worker数量的前提下,新注册worker(添加缓存)
    for (String newIp : workerIpList) {
        boolean success = registerOne(newIp);
        if (success) {
            needMoreNum--;
        }
        if (needMoreNum <= 0) {
            return;
        }
    }
}

/**
 * 第108行代码处和第123行代码处:
 * 注册新的执行节点
 * 添加缓存
 *
 * @param address 新的执行节点地址
 * @return true: register successfully / false: already exists
 */
private boolean registerOne(String address) {
    ProcessorTrackerStatus pts = address2Status.get(address);
    if (pts != null) {
        return false;
    }
    pts = new ProcessorTrackerStatus();
    pts.init(address);
    address2Status.put(address, pts);
    log.info("[PTStatusHolder-{}] register new worker: {}", instanceId, address);
    return true;
}

3.3 Dispatcher

        之前分析过,在Dispatcher的run方法中会获取等待调度器调度的任务,这其中就包含了上面调用map方法生成的子任务。这些子任务最终会包装成HeavyProcessorRunnable线程,调用到其中的innerRun方法:

/**
 * HeavyProcessorRunnable:
 */
public void innerRun() throws InterruptedException {

    //获取执行处理器
    final BasicProcessor processor = processorBean.getProcessor();

    String taskId = task.getTaskId();
    Long instanceId = task.getInstanceId();

    log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
    //缓存
    ThreadLocalStore.setTask(task);
    ThreadLocalStore.setRuntimeMeta(workerRuntime);

    // 0. 构造任务上下文
    WorkflowContext workflowContext = constructWorkflowContext();
    TaskContext taskContext = constructTaskContext();
    taskContext.setWorkflowContext(workflowContext);
    // 1. 上报执行信息
    reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);

    ProcessResult processResult;
    ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

    // 2. 根任务 & 广播执行 特殊处理
    if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
        // 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 Task
        handleBroadcastRootTask(instanceId, taskContext);
        return;
    }

    // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器)
    if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {
        handleLastTask(taskId, instanceId, taskContext, executeType);
        return;
    }

    // 4. 正式提交运行
    try {
        //这里的process方法也就是我们自己写的业务方法
        processResult = processor.process(taskContext);
        if (processResult == null) {
            processResult = new ProcessResult(false, "ProcessResult can't be null");
        }
    } catch (Throwable e) {
        log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
        processResult = new ProcessResult(false, e.toString());
    }
    //上报执行结果
    reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());
}

        innerRun方法我们之前分析过,可以看到,子任务和根任务的执行没有区别,都会最终调用到我们自己写的process方法中来。process方法中会根据isRootTask方法来区分到底是根任务执行还是子任务执行(isRootTask方法就是根据任务名来进行区分的:根任务的任务名写死为OMS_ROOT_TASK,而子任务的任务名为根任务调用map方法传进来的任务名)。


4 reduce方法

        在上面第3.3小节的HeavyProcessorRunnable线程的innerRun方法中的第36行代码处,最终任务的执行有特殊处理,我们来看下其实现:

/**
 * HeavyProcessorRunnable:
 * 处理最终任务
 * BROADCAST  => {@link BroadcastProcessor#postProcess}
 * MAP_REDUCE => {@link MapReduceProcessor#reduce}
 */
private void handleLastTask(String taskId, Long instanceId, TaskContext taskContext, ExecuteType executeType) {
    final BasicProcessor processor = processorBean.getProcessor();
    ProcessResult processResult;
    Stopwatch stopwatch = Stopwatch.createStarted();
    log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);

    //从任务表中获取所有子任务的执行结果
    List<TaskResult> taskResults = workerRuntime.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId());
    try {
        switch (executeType) {
            case BROADCAST:

                if (processor instanceof BroadcastProcessor) {
                    BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
                    processResult = broadcastProcessor.postProcess(taskContext, taskResults);
                } else {
                    processResult = BroadcastProcessor.defaultResult(taskResults);
                }
                break;
            case MAP_REDUCE:

                if (processor instanceof MapReduceProcessor) {
                    MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
                    //这里就可以看到,最终任务不会执行process方法,而是执行reduce方法
                    processResult = mapReduceProcessor.reduce(taskContext, taskResults);
                } else {
                    processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
                }
                break;
            default:
                processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
        }
    } catch (Throwable e) {
        processResult = new ProcessResult(false, e.toString());
        log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
    }

    TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
    //上报执行结果
    reportStatus(status, suit(processResult.getMsg()), null, taskContext.getWorkflowContext().getAppendedContextData());

    log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
}

        在上面第30行代码处就可以看到,最终任务不会执行process方法,而是执行reduce方法,以此来完成最后的汇总过程。而最终任务的创建是在上面第3.1小节的StatusCheckRunnable线程中,定时会去检测是否有最终任务,没有就新建。


5 执行流程

        上面说的有点乱(主要不是一条主线完整执行下来,而是有很多的线程异步的执行),总结一下核心的执行流程:

1.在PowerJob控制台中新创建一个MapReduce任务,会持久化到任务表中;

2.服务端启动的时候会开启一些定时任务,其中会找到这些新的待调度的任务(根任务),并执行其中的process业务方法;

3.根任务的process方法中会调用到map方法,将根任务分成多个子任务,持久化到任务表中;

4.这个时候定时任务就会扫到这些子任务,执行其中的process方法;

5.子任务的process方法才是真正的业务处理逻辑;

6.与此同时,还有另外一个定时任务,会定时检查MapReduce任务中的根任务和子任务是否都执行完了。如果都执行完了的话,会查找是否含有最终任务,没有则新建;

7.最终任务也会通过定时任务的扫描去执行,只不过此时执行的是reduce方法,而不是process方法。reduce方法中汇总了所有子任务的执行结果。


原创不易,未得准许,请勿转载,翻版必究

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1047122.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL学习笔记18

MySQL的备份与恢复&#xff1a; 制定数据库备份策略进行备份&#xff0c;并且把数据导入到测试环境。 核心技术&#xff1a; 1&#xff09;掌握MySQL的备份工具使用及各自特点&#xff1b; 2&#xff09;熟悉Shell脚本&#xff1b; 3&#xff09;熟悉MySQL数据的导入导出&a…

苹果 CMS 原生 Java 白菜影视 App 源码【带打包教程】

苹果 CMS 原生 Java 白菜影视 App 源码是一款功能强大的影视应用程序&#xff0c;支持画中画、投屏、点播、播放前广告和支持普通解析等多种功能。与萝卜 App 源码相比&#xff0c;该套源码更加稳定&#xff0c;且拥有画中画投屏和自定义广告等功能&#xff0c;提高了安全性。 …

PMP考前学习计划

很多小伙伴在刚刚接触到PMBOK时&#xff0c;无从下手&#xff0c;也不知道如何合理地安排自己的学习时间&#xff0c;没有一个学习计划作为指导。 今天我就给大家分享一份详细的PMP考前学习计划&#xff0c;这份计划整理并无私分享&#xff0c;欢迎大家分享给身边备考PMP的同事…

Java分支结构:一次不经意的选择,改变了我的一生。

&#x1f451;专栏内容&#xff1a;Java⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、顺序结构二、分支结构1、if语句2、switch语句 好久不见&#xff01;命运之轮常常在不经意间转动&#xff0c;有时一个看似微…

SEO的优化教程(百度SEO的介绍和优化)

百度SEO关键字介绍&#xff1a; 百度SEO关键字是指用户在搜索引擎上输入的词语&#xff0c;是搜索引擎了解网站内容和相关性的重要因素。百度SEO关键字可以分为短尾词、中尾词和长尾词&#xff0c;其中长尾词更具有针对性和精准性&#xff0c;更易于获得高质量的流量。蘑菇号-…

【MATLAB源码-第39期】基于m序列/gold序列的直接扩频通信仿真,编码方式采用卷积码,调制方式采用BPSK。

1、算法描述 直接序列扩频通信系统的仿真一般包括以下几个主要步骤&#xff1a;信号产生、扩频、卷积编码、BPSK调制、信道传输、BPSK解调、卷积码译码和解扩。 信号产生&#xff1a; 首先&#xff0c;产生一个二进制数据序列作为待发送的信息位。 扩频&#xff1a; 采用m序列…

windows11系统没有系统散热方式的解决办法

一、问题描述 当我们查看Win11系统的&#xff08;同时按下键盘的WinR键即可打开运行窗口&#xff09;【控制面板】-->【硬件和声音】-->【电源选项】-->【更改计划设置】-->【 更改高级电源设置】-->【处理器电源管理】下没有系统散热方式的选项&#xff0c;如下…

黑马JVM总结(二十五)

&#xff08;1&#xff09;字节码指令-cinit 构造方法可以分为两类&#xff0c;一类是cinit 一类init cinit是整个类的构造方法 putstatic&#xff1a;进行static变量的赋值&#xff0c;是到常量池里找到名字一个叫做i的变量 &#xff08;2&#xff09;字节码指令-init in…

Anchors

这是源代码定义的anchors概念&#xff1a; 实现过程&#xff1a; 假如有一张500500的图片&#xff0c;那么经过第一步深度卷积网络之后&#xff08;4次池化&#xff09;&#xff0c;最终就会变成一个3232的特征&#xff1a; 在开源代码实现里面&#xff1a; 所以经过卷积完之后…

word中给公式加序号的方法

①首先&#xff0c;用word插入一个公式 然后呢&#xff0c;在公式后面敲上这个公式在整篇文章中的序号。我的这个公式在整篇文章中是第三号&#xff0c;所以就敲上(3),如下图所示&#xff1a; 然后&#xff0c;在公式和序号之间&#xff0c;按住shift3(#) 切忌&#xff0c;…

makdown文法

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

【Win11 搭建miniconda 的pytorch1.12环境】

请不要质疑我一直在水文章&#xff0c;因为我电脑被格式化了&#xff0c;需求又变了&#xff0c;这不得多多与时代接轨哦&#xff01; 为我的GRCNN抓取打基础&#xff0c;之前是在Ubuntu上跑&#xff1a;【机械臂视觉抓取从理论到实战】&#xff0c;没错现在就是在WIN11上跑&am…

《数据结构、算法与应用C++语言描述》使用C++语言实现数组栈

《数据结构、算法与应用C语言描述》使用C语言实现数组栈 定义 栈的定义 把线性表的插入和删除操作限制在同一端进行&#xff0c;就得到栈数据结构。因此&#xff0c;栈是一个后进先出&#xff08;last-in-first-out&#xff0c;LIFO&#xff09;的数据结构。 栈&#xff08…

测试用例的八大基本准则

测试用例的八大基本准则 测试用例的八大基本准则功能测试性能测试兼容性测试安全测试可靠性测试易用性测试数据库测试接口测试 测试案例 测试用例的八大基本准则 上节测试用例的设计中我们讨论如何设计一个测试用例&#xff0c;知道了测试用例的设计有&#xff1a;“边界值&am…

【搭建yolox深度学习环境】

这里写目录标题 一、环境配置二、安装所需库文件2.1 安装apex 一、环境配置 首先进行yolox模型的下载&#xff1a;YOLOX(gitcode) 或者YOLOX(github) 并选择自己所需的权重模型&#xff0c;如-s-m-l等&#xff08;.pth文件&#xff09; 如果需要进行labview的使用&#xff0c;…

正则表达式贪婪模式和非贪婪模式

一、贪婪模式 贪婪模式表示尽可能多的匹配字符串&#xff0c;正则表达式六个量词元字符?、、*、{n}、{n,m}、{n,}默认是贪婪模式 接下来引入一个场景来分析说明 获取html a标签href属性的值 <a href"https://www.baidu.com/" attr"abc"></a>…

可控情感的表现力语音驱动面部动画合成

高度逼真的面部动画生成需求量很大&#xff0c;但目前仍然是一项具有挑战性的任务。现有的语音驱动面部动画方法可以产生令人满意的口部运动和嘴唇同步&#xff0c;但在表现力情感表达和情感控制的灵活性方面仍存在不足。本文提出了一种基于深度学习的新方法&#xff0c;用于从…

stm32之1602+DHT11+继电器

描述&#xff1a; 1、DHT11监测温室度&#xff0c;并显示到1602液晶上 2、通过串口打印&#xff08;或通过蓝牙模块在手机上查看&#xff09; 3、当温度大于24度时&#xff0c;开启继电器。小于时关闭继电器&#xff08;继电器可连接风扇---假想O(∩_∩)O哈哈~&#xff09; 一、…

【Axure教程】用中继器制作双坐标柱状折线图

双坐标柱状折线图常用于同时展示两组数据的图表类型&#xff0c;每组数据都有自己的纵坐标轴&#xff08;Y轴&#xff09;。一组数据通常用柱状图表示&#xff0c;而另一组数据则用折线图表示。这种图表类型有助于比较两组数据之间的关系和趋势。 那今天作者就教大家&#xff…

服务断路器_Resilience4j限流

限流YML配置 ratelimiter:instances:backendA:# 限流周期时长。 默认&#xff1a;500纳秒limitRefreshPeriod: 5s# 周期内允许通过的请求数量。 默认&#xff1a;50limitForPeriod: 2先写Controller /*** 限流* return*/GetMapping("/limiter")RateLimiter(nam…