Hadoop-MapReduce-YarnChild启动篇

news2024/11/18 19:29:45

 一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在上一篇<Hadoop-MapReduce-MRAppMaster启动篇>中已经将到:MRAppMaster的启动,那么运行MapTask、ReduceTask的容器(YarnChild)是怎么启动的呢?接下来我们一起来看看

三、结论

MRJobConfig是一个MRJob的配置,里面包含了Map、Reduce、Combine类以及Job名称、用户名称、队列名称、MapTask数量、ReduceTask数量、工作目录,jar在本地的路径、任务超时时间、任务id、输入输出目录,每个任务的内存大小和cpu核数等等。

此外它里面还有一个属性,如下:

package org.apache.hadoop.mapreduce;
public interface MRJobConfig {
    //......省略......

    public static final String APPLICATION_MASTER_CLASS =
          "org.apache.hadoop.mapreduce.v2.app.MRAppMaster";

    public static final String MAPREDUCE_V2_CHILD_CLASS = 
          "org.apache.hadoop.mapred.YarnChild";
    //......省略......
}

MRAppMaster是MapReduce的ApplicationMaster实现,负责整个MapReduce作业的过程调度和状态协调

YarnChid是运行在每个容器中的进程,负责运行某一个MapTask或者ReduceTask,

有兴趣的同学可以看一个任务的Yarn日志,也可以看我的<Hadoop-MapReduce-跟着日志理解整体流程>一篇中的日志,就可以发现ApplicationMaster容器和MapTask、ReduceTask所在容器的的日志开头分别就是MRAppMaster和YarnChid

MRAppMaster的启动参数是在YARNRunner中配置的:

public class YARNRunner implements ClientProtocol {
    private List<String> setupAMCommand(Configuration jobConf) {
    List<String> vargs = new ArrayList<>(8);
    vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);

    //......省略......

    return vargs;
  }
}

YarnChid的启动参数是在MapReduceChildJVM中配置的:

public class MapReduceChildJVM {
  public static List<String> getVMCommand(
      InetSocketAddress taskAttemptListenerAddr, Task task, 
      JVMId jvmID) {

    TaskAttemptID attemptID = task.getTaskID();
    JobConf conf = task.conf;

    Vector<String> vargs = new Vector<String>(8);

    vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(YarnChild.class.getName());  // main of Child
    
    //......省略......

    return vargsFinal;
  }
}

YarnChid启动后会启动MapTask或者ReduceTask

四、调用细节(源码跟读)

1、MRAppMaster

MRAppMaster是Map Reduce应用程序母版。状态机被封装在Job接口的实现中。所有状态更改都通过作业界面进行。每个事件都会导致作业中的有限状态转换。

MR AppMaster是松散耦合服务的组合。服务之间通过事件进行交互。这些组件类似于Actors模型。该组件对接收到的事件进行操作,并将事件发送到其他组件。

这使它保持高度并发性,而不需要同步或只需要最少的同步。

事件由中央调度机制进行调度。所有组件都注册到Dispatcher。

使用AppContext在不同组件之间共享信息。

我们先从MRAppMaster的main方法开始捋

public static void main(String[] args) {
    try {
      mainStarted = true;
      //设置默认异常处理
      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
      //获取容器相关信息:容器id、容器所在的NodeManager信息、应用提交时间
      String containerIdStr =
          System.getenv(Environment.CONTAINER_ID.name());
      String nodeHostString = System.getenv(Environment.NM_HOST.name());
      String nodePortString = System.getenv(Environment.NM_PORT.name());
      String nodeHttpPortString =
          System.getenv(Environment.NM_HTTP_PORT.name());
      String appSubmitTimeStr =
          System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
      
      //校验容器相关信息
      validateInputParam(containerIdStr,
          Environment.CONTAINER_ID.name());
      validateInputParam(nodeHostString, Environment.NM_HOST.name());
      validateInputParam(nodePortString, Environment.NM_PORT.name());
      validateInputParam(nodeHttpPortString,
          Environment.NM_HTTP_PORT.name());
      validateInputParam(appSubmitTimeStr,
          ApplicationConstants.APP_SUBMIT_TIME_ENV);

      ContainerId containerId = ContainerId.fromString(containerIdStr);
      //根据containerId 获取ApplicationAttemptId 
      //ContainerId:表示集群中容器的全局唯一标识符
      //ApplicationAttemptId:表示ApplicationMaster对给定ApplicationId的特定尝试,由于ApplicationMaster的临时故障,如硬件故障、连接问题等,在计划应用程序的节点上,可能需要多次尝试才能运行应用程序。
      ApplicationAttemptId applicationAttemptId =
          containerId.getApplicationAttemptId();
      if (applicationAttemptId != null) {
        CallerContext.setCurrent(new CallerContext.Builder(
            "mr_appmaster_" + applicationAttemptId.toString()).build());
      }
      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
      
      //构建MRAppMaster
      MRAppMaster appMaster =
          new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
              Integer.parseInt(nodePortString),
              Integer.parseInt(nodeHttpPortString), appSubmitTime);
      //在JVM正常关闭期间接收到AND信号时运行的关闭挂钩。
      ShutdownHookManager.get().addShutdownHook(
        new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
      //构建 map/reduce 作业配置
      JobConf conf = new JobConf(new YarnConfiguration());
      //添加配置资源(启动MRAppMaster 时配置过)
      conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
      
      MRWebAppUtil.initialize(conf);
      //记录系统属性
      String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf);
      if (systemPropsToLog != null) {
        LOG.info(systemPropsToLog);
      }

      String jobUserName = System
          .getenv(ApplicationConstants.Environment.USER.name());
      conf.set(MRJobConfig.USER_NAME, jobUserName);
      //初始化并启动该作业的AppMaster
      initAndStartAppMaster(appMaster, conf, jobUserName);
    } catch (Throwable t) {
      LOG.error("Error starting MRAppMaster", t);
      ExitUtil.terminate(1, t);
    }
  }

下面我们接着看initAndStartAppMaster()

protected static void initAndStartAppMaster(final MRAppMaster appMaster,
      final JobConf conf, String jobUserName) throws IOException,
      InterruptedException {
    //设置UGI的静态配置
    UserGroupInformation.setConfiguration(conf);
    // MAPREDUCE-6565: 需要设置SecurityUtil的配置。
    SecurityUtil.setConfiguration(conf);
    //安全框架已经将令牌加载到当前的UGI中,只需使用它们
    Credentials credentials =
        UserGroupInformation.getCurrentUser().getCredentials();
    LOG.info("Executing with tokens: {}", credentials.getAllTokens());
    
    //使用登录名创建用户。它旨在用于RPC中的远程用户,因为它没有任何凭据。
    UserGroupInformation appMasterUgi = UserGroupInformation
        .createRemoteUser(jobUserName);
    //将给定的凭据添加到此用户。
    appMasterUgi.addCredentials(credentials);

    //现在删除AM->RM令牌,这样任务就没有它了
    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
    while (iter.hasNext()) {
      Token<?> token = iter.next();
      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    //将所有凭据从一个凭据对象复制到另一个。现有的机密和令牌将被覆盖。
    conf.getCredentials().addAll(credentials);
    appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws Exception {
        //服务所需的所有初始化代码。
        //在特定服务实例的生命周期中,此方法只会被调用一次。
        //有兴趣的同学可以进去看看,我这里大致总结下它做了什么:
        //    1、创建作业类加载器
        //    2、获得作业所需的令牌,并将其放入UGI
        //    3、创建事件调度程序接口。它根据事件类型将事件分派给已注册的事件处理程序。
        //    4、将事件调度程序添加到管理的服务列表
        //    5、创建尝试任务完成监控(如果任务尝试在FINISHING状态下停留的时间过长,则此类会生成TA_TIMED_OUT。)
        //    6、将尝试任务完成监控添加到管理的服务列表
        //    7、根据尝试任务创建心的jobid
        //    8、判断是用新API还是旧API
        //    9、获取该作业输出格式的输出提交器。它负责确保正确提交输出。
        //    10、检查该作业在HDFS上的临时目录是否存在
        //    11、构建用于处理来自JobClient的请求的服务
        //    12、创建用于处理输出提交的服务并添加到管理的服务列表
        //    13、处理来自RM的抢占请求
        //    14、创建用于处理对TaskUmplicalProtocol的请求的服务并添加到管理的服务列表
        //    15、创建用于记录作业历史事件的服务,并注册到事件调度程序中
        //    16、创建该作业的事件调度服务并注册到事件调度程序中
        //    17、创建投机者组件事件调度服务并注册到事件调度程序中(任务尝试的状态更新将发送到此组件)并注册到事件调度程序
        //    18、启动临时目录清理程序
        //    19、构建从ResourceManager分配容器的服务(如果是uber模式,则是伪造容器)并注册到事件调度程序
        //    20、构建通过NodeManager启动分配容器的相应服务并注册到事件调度程序
        //    21、最后添加JobHistoryEventHandler并添加到管理的服务列表
        appMaster.init(conf);
        //启动该作业的AppMaster
        appMaster.start();
        if(appMaster.errorHappenedShutDown) {
          throw new IOException("Was asked to shut down.");
        }
        return null;
      }
    });
  }

下面我们接着看appMaster.start(),它最终还是会调用本类的serviceStart()

protected void serviceStart() throws Exception {

    amInfos = new LinkedList<AMInfo>();
    //因为作业有失败重试功能,假如这一次是重试作业旧需要覆盖上一次的Task并清理上一次的临时目录和输出
    completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
    processRecovery();
    cleanUpPreviousJobOutput();

    //当前AM生成的当前AMInfo(里面有AppAttemptId、开始时间、所在容器id、所在NodeManager域名和端口)
    AMInfo amInfo =
        MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
            nmPort, nmHttpPort);

    //创建并初始化(但不启动)单例job。
    //并将job完成事件和处理程序注册到事件调度程序中
    job = createJob(getConfig(), forcedState, shutDownMessage);

    //为所有以前的AM发送一个MR AM启动的事件。
    for (AMInfo info : amInfos) {
      dispatcher.getEventHandler().handle(
          new JobHistoryEvent(job.getID(), new AMStartedEvent(info
              .getAppAttemptId(), info.getStartTime(), info.getContainerId(),
              info.getNodeManagerHost(), info.getNodeManagerPort(), info
                  .getNodeManagerHttpPort(), appSubmitTime)));
    }

    //为此AM发送一个MR AM启动的事件。
    dispatcher.getEventHandler().handle(
        new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
            .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
            amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
                .getNodeManagerHttpPort(), this.forcedState == null ? null
                    : this.forcedState.toString(), appSubmitTime)));
    amInfos.add(amInfo);

    //metrics system(度量系统)初始化并启动
    DefaultMetricsSystem.initialize("MRAppMaster");

    boolean initFailed = false;
    if (!errorHappenedShutDown) {
      // create a job event for job initialization
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
      // Send init to the job (this does NOT trigger job execution)
      // This is a synchronous call, not an event through dispatcher. We want
      // job-init to be done completely here.
      jobEventDispatcher.handle(initJobEvent);

      // If job is still not initialized, an error happened during
      // initialization. Must complete starting all of the services so failure
      // events can be processed.
      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);

      // JobImpl's InitTransition is done (call above is synchronous), so the
      // "uber-decision" (MR-1220) has been made.  Query job and switch to
      // ubermode if appropriate (by registering different container-allocator
      // and container-launcher services/event-handlers).

      if (job.isUber()) {
        speculatorEventDispatcher.disableSpeculation();
        LOG.info("MRAppMaster uberizing job " + job.getID()
            + " in local container (\"uber-AM\") on node "
            + nmHost + ":" + nmPort + ".");
      } else {
        // send init to speculator only for non-uber jobs. 
        // This won't yet start as dispatcher isn't started yet.
        dispatcher.getEventHandler().handle(
            new SpeculatorEvent(job.getID(), clock.getTime()));
        LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
            + "job " + job.getID() + ".");
      }
      // Start ClientService here, since it's not initialized if
      // errorHappenedShutDown is true
      clientService.start();
    }
    //启动所有组件
    super.serviceStart();

    //最终设置作业类加载器
    MRApps.setClassLoader(jobClassLoader, getConfig());

    if (initFailed) {
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      //所有组件都已启动后,启动作业
      startJobs();
    }
  }

下面我们接着看startJobs(),可以覆盖此项以实例化多个作业并创建工作流

protected void startJobs() {
    /** 创建一个作业启动事件(JobEventType.JOB_START)来启动 */
    JobEvent startJobEvent = new JobStartEvent(job.getID(),
        recoveredJobStartTime);
    /** 发送作业启动事件。这将触发作业执行 */
    dispatcher.getEventHandler().handle(startJobEvent);
  }

下面我们看下JobEventType.JOB_START事件的处理,作业启动事件和处理程序在JobImpl中。

2、JobImpl

JobImpl是作业界面的实施。维护作业的状态机。读和写调用使用ReadWriteLock实现并发。

关于作业的状态有NEW、INITED、SETUP、RUNNING、KILL_WAIT、COMMITTING、SUCCEEDED、FAIL_WAIT、FAIL_ABORT、KILL_ABORT、FAILED、KILLED、INTERNAL_ERROR、AM_REBOOT等,有兴趣的同学可以跟读下每个状态的转换细节,这里不一一跟读了,

protected static final
    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
       stateMachineFactory
     = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
              (JobStateInternal.NEW)

          // Transitions from NEW state
          

          // Transitions from INITED state
          .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
              JobEventType.JOB_START,
              new StartTransition())

          // Transitions from SETUP state
          .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
              JobEventType.JOB_SETUP_COMPLETED,
              new SetupCompletedTransition())

          // Transitions from RUNNING state
          

          // Transitions from KILL_WAIT state.
          

          // Transitions from COMMITTING state
          

          // Transitions from SUCCEEDED state
          

          // Transitions from FAIL_WAIT state
         

          //Transitions from FAIL_ABORT state
         

          // Transitions from KILL_ABORT state
          

          // Transitions from FAILED state
          

          // Transitions from KILLED state
          

          // No transitions from INTERNAL_ERROR state. Ignore all.
          

          // No transitions from AM_REBOOT state. Ignore all.
          

          // create the topology tables
          .installTopology();

以下是JobEventType.JOB_START事件的处理程序

public static class StartTransition
  implements SingleArcTransition<JobImpl, JobEvent> {
    /**
     * 这个转换在事件调度器线程中执行,尽管它是在MRAppMaster的startJobs()方法中触发的。
     */
    @Override
    public void transition(JobImpl job, JobEvent event) {
      JobStartEvent jse = (JobStartEvent) event;
      if (jse.getRecoveredJobStartTime() != -1L) {
        job.startTime = jse.getRecoveredJobStartTime();
      } else {
        job.startTime = job.clock.getTime();
      }
      JobInitedEvent jie =
        new JobInitedEvent(job.oldJobId,
             job.startTime,
             job.numMapTasks, job.numReduceTasks,
             job.getState().toString(),
             job.isUber());
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
      JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
          job.appSubmitTime, job.startTime);
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
      job.metrics.runningJob(job);
      
      //CommitterEventType.JOB_SETUP事件处理
      job.eventHandler.handle(new CommitterJobSetupEvent(
              job.jobId, job.jobContext));
    }
  }

JOB_SETUP事件是由CommitterEventHandler处理

3、CommitterEventHandler

CommitterEventHandler负责处理JOB_SETUP、JOB_COMMIT、JOB_ABORT、TASK_ABORT事件

public class CommitterEventHandler extends AbstractService
    implements EventHandler<CommitterEvent> {
    
    public void run() {
      LOG.info("Processing the event " + event.toString());
      switch (event.getType()) {
      case JOB_SETUP:
        handleJobSetup((CommitterJobSetupEvent) event);
        break;
      case JOB_COMMIT:
        handleJobCommit((CommitterJobCommitEvent) event);
        break;
      case JOB_ABORT:
        handleJobAbort((CommitterJobAbortEvent) event);
        break;
      case TASK_ABORT:
        handleTaskAbort((CommitterTaskAbortEvent) event);
        break;
      default:
        throw new YarnRuntimeException("Unexpected committer event "
            + event.toString());
      }

    }

    //处理JOB_SETUP事件
    protected void handleJobSetup(CommitterJobSetupEvent event) {
      try {
        committer.setupJob(event.getJobContext());
        //现在job的状态为JobEventType.JOB_SETUP_COMPLETED
        context.getEventHandler().handle(
            new JobSetupCompletedEvent(event.getJobID()));
      } catch (Exception e) {
        LOG.warn("Job setup failed", e);
        context.getEventHandler().handle(new JobSetupFailedEvent(
            event.getJobID(), StringUtils.stringifyException(e)));
      }
    }




}

4、再回JobImpl

JobEventType.JOB_SETUP_COMPLETED的处理程序为SetupCompletedTransition(),在第2步中有。

private static class SetupCompletedTransition
      implements SingleArcTransition<JobImpl, JobEvent> {
    @Override
    public void transition(JobImpl job, JobEvent event) {
      job.setupProgress = 1.0f;
      //调度MapTask
      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
      //调度ReduceTask
      job.scheduleTasks(job.reduceTasks, true);

      //如果没有任务,只需过渡到已完成的工作状态
      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
        job.eventHandler.handle(new JobEvent(job.jobId,
            JobEventType.JOB_COMPLETED));
      }
    }
  }


  protected void scheduleTasks(Set<TaskId> taskIDs,
      boolean recoverTaskOutput) {
    for (TaskId taskID : taskIDs) {
      TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
      if (taskInfo != null) {
        //如果是重试任务需要覆盖
        eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
            committer, recoverTaskOutput));
      } else {
        //新任务,需要做任务调度处理,我们看这块的逻辑
        eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
      }
    }
  }

5、TaskImpl

TaskImpl是任务接口的实现,维护任务的状态机。任务的状态有NEW、SCHEDULED、RUNNING、KILL_WAIT、SUCCEEDED、FAILED

private static final StateMachineFactory
               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
            stateMachineFactory 
           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
               (TaskStateInternal.NEW)

    // 定义Task的状态机

    // Transitions from NEW state
    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
        TaskEventType.T_SCHEDULE, new InitialScheduleTransition())


    // Transitions from SCHEDULED state
      //启动第一次尝试时,任务状态设置为RUNNING

 
    // Transitions from RUNNING state
    

    // Transitions from KILL_WAIT state
    

    // Transitions from SUCCEEDED state
   

    // Transitions from FAILED state        


    // create the topology tables
    .installTopology();

可以看到处理事件调用的是InitialScheduleTransition()

private static class InitialScheduleTransition
  implements SingleArcTransition<TaskImpl, TaskEvent> {

  @Override
  public void transition(TaskImpl task, TaskEvent event) {
    task.addAndScheduleAttempt(Avataar.VIRGIN);
    task.scheduledTime = task.clock.getTime();
    task.sendTaskStartedEvent();
  }
}

private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) {
    TaskAttempt attempt = addAttempt(avataar);
    inProgressAttempts.add(attempt.getID());
    //schedule the nextAttemptNumber
    if (failedAttempts.size() > 0 || reschedule) {
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_RESCHEDULE));
    } else {
      //将任务状态变成TaskAttemptEventType.TA_SCHEDULE)
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_SCHEDULE));
    }
  }

private void sendTaskStartedEvent() {
    launchTime = getLaunchTime();
    //创建事件以记录任务的开始
    TaskStartedEvent tse = new TaskStartedEvent(
        TypeConverter.fromYarn(taskId), launchTime,
        TypeConverter.fromYarn(taskId.getTaskType()),
        getSplitsAsString());
    eventHandler
        .handle(new JobHistoryEvent(taskId.getJobId(), tse));
    historyTaskStartGenerated = true;
  }

  public static org.apache.hadoop.mapreduce.TaskType fromYarn(
      TaskType taskType) {
    switch (taskType) {
    case MAP:
      return org.apache.hadoop.mapreduce.TaskType.MAP;
    case REDUCE:
      return org.apache.hadoop.mapreduce.TaskType.REDUCE;
    default:
      throw new YarnRuntimeException("Unrecognized task type: " + taskType);
    }
  }

6、TaskAttemptImpl

TaskAttemptImpl是尝试任务的实现,因为有失败重试机制,因此每一次在容器中运行的任务都先称为尝试任务,当尝试任务运行成功后,对应的任务也会标记为成功。

TaskAttemptImpl维护尝试任务的状态机。任务的状态有NEW、UNASSIGNED、ASSIGNED、RUNNING、SUCCESS_FINISHING_CONTAINER、FAIL_FINISHING_CONTAINER、COMMIT_PENDING、SUCCESS_CONTAINER_CLEANUP、FAIL_CONTAINER_CLEANUP、KILL_CONTAINER_CLEANUP、FAIL_TASK_CLEANUP、KILL_TASK_CLEANUP、SUCCEEDED、FAILED、KILLED

private static final StateMachineFactory
        <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
        stateMachineFactory
    = new StateMachineFactory
             <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
           (TaskAttemptStateInternal.NEW)

     // Transitions from the NEW state.
     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
   

     // Transitions from the UNASSIGNED state.
     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
         new ContainerAssignedTransition())
    

     // Transitions from the ASSIGNED state.
   

     // Transitions from RUNNING state.
     

     // Transitions from SUCCESS_FINISHING_CONTAINER state
    

     // Transitions from COMMIT_PENDING state
     

      // Transitions from SUCCESS_CONTAINER_CLEANUP state
      // kill and cleanup the container
      

     // Transitions from FAIL_CONTAINER_CLEANUP state.
     

      // Transitions from KILL_CONTAINER_CLEANUP
     

     // Transitions from FAIL_TASK_CLEANUP
     // run the task cleanup
    

     // Transitions from KILL_TASK_CLEANUP
    

      // Transitions from SUCCEEDED
     

     // Transitions from FAILED state


     // Transitions from KILLED state


     // create the topology tables
     .installTopology();

可以看到处理TaskAttemptEventType.TA_SCHEDULE事件的逻辑是RequestContainerTransition()

 static class RequestContainerTransition implements
      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
    private final boolean rescheduled;
    public RequestContainerTransition(boolean rescheduled) {
      this.rescheduled = rescheduled;
    }
    @SuppressWarnings("unchecked")
    @Override
    public void transition(TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      //告诉任何投机者我们正在请求一个容器
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
      //申请容器
      if (rescheduled) {
        taskAttempt.eventHandler.handle(
            ContainerRequestEvent.createContainerRequestEventForFailedContainer(
                taskAttempt.attemptId, 
                taskAttempt.resourceCapability));
      } else {
        //处理ContainerAllocator.EventType.CONTAINER_REQ事件
        taskAttempt.eventHandler.handle(new ContainerRequestEvent(
            taskAttempt.attemptId, taskAttempt.resourceCapability,
            taskAttempt.dataLocalHosts.toArray(
                new String[taskAttempt.dataLocalHosts.size()]),
            taskAttempt.dataLocalRacks.toArray(
                new String[taskAttempt.dataLocalRacks.size()])));
      }
    }
  }

7、LocalContainerAllocator

LocalContainerAllocator负责在本地分配容器。不分配真正的容器;而是为所有请求发送一个已分配的事件。也处理ContainerAllocator.EventType.CONTAINER_REQ事件

public void handle(ContainerAllocatorEvent event) {
    if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
      LOG.info("Processing the event " + event.toString());
      //分配与AM相同的容器ID
      ContainerId cID =
          ContainerId.newContainerId(getContext().getApplicationAttemptId(),
            this.containerId.getContainerId());
      Container container = recordFactory.newRecordInstance(Container.class);
      container.setId(cID);
      NodeId nodeId = NodeId.newInstance(this.nmHost, this.nmPort);
      container.setResource(Resource.newInstance(0, 0));
      container.setNodeId(nodeId);
      container.setContainerToken(null);
      container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);
      //将容器分配的事件发送到任务尝试

      if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
        JobCounterUpdateEvent jce =
            new JobCounterUpdateEvent(event.getAttemptID().getTaskId()
                .getJobId());
        // TODO Setting OTHER_LOCAL_MAP for now.
        jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
        eventHandler.handle(jce);
      }
      //此时处理TaskAttemptEventType.TA_ASSIGNED事件
      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
          event.getAttemptID(), container, applicationACLs));
    }
  }

8、再回TaskAttemptImpl

第6步已经写了TaskAttemptEventType.TA_ASSIGNED事件的处理逻辑:new ContainerAssignedTransition()

private static class ContainerAssignedTransition implements
      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
    @SuppressWarnings({ "unchecked" })
    @Override
    public void transition(final TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      final TaskAttemptContainerAssignedEvent cEvent = 
        (TaskAttemptContainerAssignedEvent) event;
      Container container = cEvent.getContainer();
      taskAttempt.container = container;
      //这是真正的Task
      taskAttempt.remoteTask = taskAttempt.createRemoteTask();
      taskAttempt.jvmID =
          new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
              taskAttempt.remoteTask.isMapTask(),
              taskAttempt.container.getId().getContainerId());
      taskAttempt.taskAttemptListener.registerPendingTask(
          taskAttempt.remoteTask, taskAttempt.jvmID);

      taskAttempt.computeRackAndLocality();
      
      //启动容器
      //为给定的Task尝试创建要启动的容器对象
      ContainerLaunchContext launchContext = createContainerLaunchContext(
          cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
          taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
          taskAttempt.taskAttemptListener, taskAttempt.credentials);
      taskAttempt.eventHandler
        .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
          launchContext, container, taskAttempt.remoteTask));

      // 向投机者发送我们的容器需求得到满足的事件
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
    }
  }


static ContainerLaunchContext createContainerLaunchContext(
      Map<ApplicationAccessType, String> applicationACLs,
      Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask,
      final org.apache.hadoop.mapred.JobID oldJobId,
      WrappedJvmID jvmID,
      TaskAttemptListener taskAttemptListener,
      Credentials credentials) {

    synchronized (commonContainerSpecLock) {
      if (commonContainerSpec == null) {
        commonContainerSpec = createCommonContainerLaunchContext(
            applicationACLs, conf, jobToken, oldJobId, credentials);
      }
    }

    //填写通用规范中缺少的每个容器所需的字段

    boolean userClassesTakesPrecedence =
      conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);

    //通过从公共环境克隆来设置环境。
    Map<String, String> env = commonContainerSpec.getEnvironment();
    Map<String, String> myEnv = new HashMap<String, String>(env.size());
    myEnv.putAll(env);
    if (userClassesTakesPrecedence) {
      myEnv.put(Environment.CLASSPATH_PREPEND_DISTCACHE.name(), "true");
    }
    MapReduceChildJVM.setVMEnv(myEnv, remoteTask);

    //设置启动命令 这里会调用MapReduceChildJVM的getVMCommand()
    List<String> commands = MapReduceChildJVM.getVMCommand(
        taskAttemptListener.getAddress(), remoteTask, jvmID);

    //复制ByteBuffers以供多个容器访问。
    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
    for (Entry<String, ByteBuffer> entry : commonContainerSpec
                .getServiceData().entrySet()) {
      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
    }

    //构建实际的容器
    ContainerLaunchContext container = ContainerLaunchContext.newInstance(
        commonContainerSpec.getLocalResources(), myEnv, commands,
        myServiceData, commonContainerSpec.getTokens().duplicate(),
        applicationACLs);

    return container;
  }

9、MapReduceChildJVM

这里就会加载YarnChild,启动运行MapTask、ReduceTask的容器

public static List<String> getVMCommand(
      InetSocketAddress taskAttemptListenerAddr, Task task, 
      JVMId jvmID) {

    TaskAttemptID attemptID = task.getTaskID();
    JobConf conf = task.conf;

    Vector<String> vargs = new Vector<String>(8);

    vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
        + "/bin/java");

    // Add child (task) java-vm options.
    //
    // The following symbols if present in mapred.{map|reduce}.child.java.opts 
    // value are replaced:
    // + @taskid@ is interpolated with value of TaskID.
    // Other occurrences of @ will not be altered.
    //
    // Example with multiple arguments and substitutions, showing
    // jvm GC logging, and start of a passwordless JVM JMX agent so can
    // connect with jconsole and the likes to watch child memory, threads
    // and get thread dumps.
    //
    //  <property>
    //    <name>mapred.map.child.java.opts</name>
    //    <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
    //           -Dcom.sun.management.jmxremote.authenticate=false \
    //           -Dcom.sun.management.jmxremote.ssl=false \
    //    </value>
    //  </property>
    //
    //  <property>
    //    <name>mapred.reduce.child.java.opts</name>
    //    <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
    //           -Dcom.sun.management.jmxremote.authenticate=false \
    //           -Dcom.sun.management.jmxremote.ssl=false \
    //    </value>
    //  </property>
    //
    String javaOpts = getChildJavaOpts(conf, task.isMapTask());
    javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
    String [] javaOptsSplit = javaOpts.split(" ");
    for (int i = 0; i < javaOptsSplit.length; i++) {
      vargs.add(javaOptsSplit[i]);
    }

    Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
    MRApps.addLog4jSystemProperties(task, vargs, conf);

    if (conf.getProfileEnabled()) {
      if (conf.getProfileTaskRange(task.isMapTask()
                                   ).isIncluded(task.getPartition())) {
        final String profileParams = conf.get(task.isMapTask()
            ? MRJobConfig.TASK_MAP_PROFILE_PARAMS
            : MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());
        vargs.add(String.format(profileParams,
            getTaskLogFile(TaskLog.LogName.PROFILE)));
      }
    }

    // Add main class and its arguments 
    vargs.add(YarnChild.class.getName());  // main of Child
    // pass TaskAttemptListener's address
    vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); 
    vargs.add(Integer.toString(taskAttemptListenerAddr.getPort())); 
    vargs.add(attemptID.toString());                      // pass task identifier

    // Finally add the jvmID
    vargs.add(String.valueOf(jvmID.getId()));
    vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
    vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));

    // Final commmand
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    Vector<String> vargsFinal = new Vector<String>(1);
    vargsFinal.add(mergedCommand.toString());
    return vargsFinal;
  }

五、总结

1、MRAppMaster启动

2、初始化并启动job

3、处理各种job状态

4、启动Task

5、处理各种Task事件

6、启动尝试任务

7、处理各种尝试任务事件

8、在尝试任务的TaskAttemptEventType.TA_SCHEDULE事件处理时申请容器

9、调用java命令配置主类YarnChild启动容器运行MapTask或者ReduceTask

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1419261.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何让wordpress首页只显示某一篇文章全部内容?在您的主页显示选择

大多数WordPress站点首页默认都是显示最新发布的文章列表&#xff0c;不过有些站点比较特殊&#xff0c;只想显示某一篇文章的全部内容&#xff0c;那么应该怎么设置呢&#xff1f; 其实&#xff0c;WordPress后台 >> 设置 >> 阅读 >> 在“您的主页显示”中…

VS+QT 配置Eigen库

1、下载Eigen库&#xff0c;如下&#xff1a; 2、解压到项目目录下&#xff0c;如下&#xff1a; 3、 在C/C中包含文件&#xff0c;如下&#xff1a; 4、在头文件中加入如下代码&#xff1a; 5、测试代码&#xff1a; //.cpp文件 #include "testEigen.h"testEigen::…

【Python】03快速上手爬虫案例三:搞定药师帮

文章目录 前言1、破解验证码2、获取数据 前言 流程&#xff1a;通过用户名、密码、搞定验证码&#xff0c;登录进药师帮网站&#xff0c;然后抓取想要的数据。 爬取数据&#xff0c;最终效果图&#xff1a; 1、破解验证码 使用药师帮测试系统&#xff1a;https://dianrc.ysb…

快速入门存内计算—助力人工智能加速深度学习模型的训练和推理

存内计算&#xff1a;提高计算性能和能效的新技术 传统的计算机架构是将数据存储在存储器中&#xff0c;然后将数据传输到计算单元进行处理。这种架构存在一个性能瓶颈&#xff0c;即数据传输延迟。存内计算通过将计算单元集成到存储器中&#xff0c;消除了数据传输延迟&#…

HiveSQL题——窗口函数(lag/lead)

目录 一、窗口函数的知识点 1.1 窗户函数的定义 1.2 窗户函数的语法 1.3 窗口函数分类 1.4 前后函数:lag/lead 二、实际案例 2.1 股票的波峰波谷 0 问题描述 1 数据准备 2 数据分析 3 小结 2.2 前后列转换&#xff08;面试题&#xff09; 0 问题描述 1 数据准备 …

kubernetes-快速部署一套k8s集群

1、前置知识点 1.1 生产环境可部署Kubernetes集群的两种方式 目前生产部署Kubernetes集群主要有两种方式&#xff1a; kubeadm Kubeadm是一个K8s部署工具&#xff0c;提供kubeadm init和kubeadm join&#xff0c;用于快速部署Kubernetes集群。 二进制包 从github下载发行…

04.对象树

一、引入 1.QT实现输出"hello world" 使用QT编写"hello world"程序&#xff0c;有两种实现方式&#xff1a; &#xff08;1&#xff09;直接在生成的ui文件中&#xff0c;拖入一个label控件&#xff0c;双击控件编辑内容即可实现 &#xff08;2&#xff0…

【C++历练之路】探秘C++三大利器之一——多态

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; 前言&#x1f354;: 在计算机科学的广袤领域中&#xff0c;C多态性是一门令人着迷的技术艺术&#xff0c;它赋予我们的代码更强大的灵活性和可维护性。想象一下&#xff0c;你正在构建一个程序&#xff0c;需要适应不断…

【技术分享】远程透传网关-单网口快速实现威纶通触摸屏程序远程上下载

准备工作 一台可联网操作的电脑一台单网口的远程透传网关及博达远程透传配置工具网线一条&#xff0c;用于实现网络连接和连接触摸屏一台威纶通触摸屏及其编程软件一张4G卡或WIFI天线实现通讯(使用4G联网则插入4G SIM卡&#xff0c;WIFI联网则将WIFI天线插入USB口&#xff09;…

Redis 面试题 | 19.精选Redis高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

Mac安装及配置MySql及图形化工具MySQLworkbench安装

Mac下载配置MySql mysql下载及安装 下载地址&#xff1a;https://dev.mysql.com/downloads/mysql/ 根据自己电脑确定下载x86还是ARM版本的 如果不确定&#xff0c;可以查看自己电脑版本&#xff0c;终端输入命令 uname -a 点击Download下载&#xff0c;可跳过登录注册&…

沙龙回顾|“强标”发布在即,汽车数据安全的挑战与应对

随着智能汽车产业驶入发展快车道&#xff0c;“数据安全”的重要性也日益突出。2020年以来发现的针对整车企业、车联网信息服务提供商等相关企业的恶意攻击达到280余万次。2023年初至今&#xff0c;就发生超过20起与车企相关数据泄露事件&#xff0c;汽车数据安全的现状不容乐观…

基于Matlab无刷直流电机系统仿真建模的新方法

摘 要&#xff1a;在分析无刷直流电机&#xff08;BLDC&#xff09;数学模型的基础上&#xff0c;提出了无刷直流电机系统仿真建模的 新方法。在Matlab/Simulink 中&#xff0c;建立独立的功能模块&#xff0c;如BLDC 本体模块、电流滞环控制模块、 速度控制模块等&#xff0c;…

防御保护--智能选路

目录 就近选路 策略选路--PBR DSCP优先级 智能选路--全局路由策略 1.基于链路带宽的负载分担 2.基于链路质量进行负载分担 3.基于链路权重进行负载分担 4.基于链路优先级的主备备份 ​编辑 DNS透明代理 就近选路 我们希望在访问不同运营商服务器时&#xff0c;通过对…

IDEA安装MyBatisX插件

IDEA工具在开发人员中经常使用&#xff0c;从dao层到xml文件对应的查看很费劲&#xff0c;这时候就有相应的插件工具出现了MyBatisX。他的好处如下&#xff1a; mapper and xml can jump back and forth mybatis.xml,mapper.xml prompt mapper and xml support auto prompt lik…

多场景建模:腾讯3MN

3MN: Three Meta Networks for Multi-Scenario and Multi-Task Learning in Online Advertising Recommender Systems 背景 推荐领域的多场景多任务学习&#xff1a;维护单模型即可节省资源也可节省人力&#xff1b;各个场景的数据共享&#xff0c;理论上面学习是更加充分的 …

RK3568 Android Launcher3定制修改

1.去掉Google搜索栏 目录packages/apps/Launcher3/src_build_config/com/android/launcher3/BuildConfig.java 修改如下&#xff1a; 2.Launcher首页去掉抽屉菜单&#xff0c;所有应用都放到桌面 第一步&#xff1a;禁止上滑显示抽屉 在目录packages/apps/Launcher3/quickste…

大模型学习与实践笔记(十四)

使用 OpenCompass 评测 InternLM2-Chat-7B 模型使用 LMDeploy 0.2.0 部署后在 C-Eval 数据集上的性能 步骤1&#xff1a;下载internLM2-Chat-7B 模型,并进行挂载 以下命令将internlm2-7b模型挂载到当前目录下&#xff1a; ln -s /share/model_repos/internlm2-7b/ ./ 步骤2&…

非阿里云注册域名如何在云解析DNS设置解析?

概述 非阿里云注册域名使用云解析DNS&#xff0c;按照如下步骤&#xff1a; 添加域名。 添加解析记录。 修改DNS服务器。 DNS服务器变更全球同步&#xff0c;等待48小时。 添加解析记录 登录云解析DNS产品控制台。 在 域名解析 页面中&#xff0c;单击 添加域名 。 在 …

虚拟创业团队如何建设

虚拟创业团队如何建设 一、目标设定 在组建虚拟创业团队之前&#xff0c;明确团队目标是至关重要的。目标应具体、可衡量、可实现&#xff0c;并与团队成员共享。通过设定共同的目标&#xff0c;团队成员能够更好地理解团队愿景&#xff0c;明确个人职责&#xff0c;并朝着同…