Hadoop-MapReduce-MRAppMaster启动篇

news2024/11/18 11:41:48

 一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在上一篇<Hadoop-MapReduce-源码跟读-客户端篇>中已经将到:作业提交到ResourceManager,那么对于该Job第一个容器(MRAppMaster)是怎么启动的呢?接下来我们一起来看看

三、结论

MRJobConfig是一个MRJob的配置,里面包含了Map、Reduce、Combine类以及Job名称、用户名称、队列名称、MapTask数量、ReduceTask数量、工作目录,jar在本地的路径、任务超时时间、任务id、输入输出目录,每个任务的内存大小和cpu核数等等。

此外它里面还有一个属性,如下:

package org.apache.hadoop.mapreduce;
public interface MRJobConfig {
    //......省略......

    public static final String APPLICATION_MASTER_CLASS =
          "org.apache.hadoop.mapreduce.v2.app.MRAppMaster";

    public static final String MAPREDUCE_V2_CHILD_CLASS = 
          "org.apache.hadoop.mapred.YarnChild";
    //......省略......
}

MRAppMaster是MapReduce的ApplicationMaster实现,负责整个MapReduce作业的过程调度和状态协调

YarnChid是运行在每个容器中的进程,负责运行某一个MapTask或者ReduceTask,

有兴趣的同学可以看一个任务的Yarn日志,也可以看我的<Hadoop-MapReduce-跟着日志理解整体流程>一篇中的日志,就可以发现ApplicationMaster容器和MapTask、ReduceTask所在容器的的日志开头分别就是MRAppMaster和YarnChid

MRAppMaster的启动参数是在YARNRunner中配置的:

public class YARNRunner implements ClientProtocol {
    private List<String> setupAMCommand(Configuration jobConf) {
    List<String> vargs = new ArrayList<>(8);
    vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);

    //......省略......

    return vargs;
  }
}

YarnChid的启动参数是在MapReduceChildJVM中配置的:

public class MapReduceChildJVM {
  public static List<String> getVMCommand(
      InetSocketAddress taskAttemptListenerAddr, Task task, 
      JVMId jvmID) {

    TaskAttemptID attemptID = task.getTaskID();
    JobConf conf = task.conf;

    Vector<String> vargs = new Vector<String>(8);

    vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
        + "/bin/java");

    //......省略......

    vargs.add(YarnChild.class.getName());  // main of Child
    
    //......省略......

    return vargsFinal;
  }
}

YarnChid启动后会启动MapTask或者ReduceTask

四、调用细节(源码跟读)

我们接着上一篇<Hadoop-MapReduce-源码跟读-客户端篇>的源码开始分析,即:YARNRunner.submitJob()中的ApplicationSubmissionContext构建

1、YARNRunner

1.1、createApplicationSubmissionContext

//构建启动MR AM所需的所有信息
public ApplicationSubmissionContext createApplicationSubmissionContext(
      Configuration jobConf, String jobSubmitDir, Credentials ts)
      throws IOException {
    //获取applicationId (resMgrDelegate 是 YarnClient 的子类)
    //applicationId是应用程序的全局唯一标识符,通过使用集群时间戳(即ResourceManager的开始时间)以及应用程序的单调递增计数器来实现的。
    ApplicationId applicationId = resMgrDelegate.getApplicationId();

    //设置本地资源
    //LocalResource表示运行容器所需的本地资源
    //NodeManager负责在启动容器之前本地化资源
    //应用程序可以指定LocalResourceType和LocalResourceVisibility
    //LocalResourceType:
    //    1、FILE    : 常规文件,即不间断的字节
    //    2、ARCHIVE : 归档,由NodeManager自动取消归档
    //    3、PATTERN : 1和2的混合
    //LocalResourceVisibility:
    //    1、PUBLIC  :     由节点上的所有用户共享
    //    2、PRIVATE :     在节点上同一用户的所有应用程序之间共享
    //    3、APPLICATION : 仅在节点上的同一应用程序的容器之间共享。
    //该方法会设置job配置文件、job jar包的HDFS路径等,最后得到这样一个Map
    //    <"job.xml" , LocalResource>
    //    <"job.jar" , LocalResource>
    //    <"jobSubmitDir/job.split" , LocalResource>
    //    <"jobSubmitDir/job.splitmetainfo , LocalResource>
    Map<String, LocalResource> localResources =
        setupLocalResources(jobConf, jobSubmitDir);

    //设置安全令牌
    DataOutputBuffer dob = new DataOutputBuffer();
    ts.writeTokenStorageToStream(dob);
    ByteBuffer securityTokens =
        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    //为AM容器设置ContainerLaunchContext
    //ContainerLaunchContext表示NodeManager启动容器所需的所有信息,包括:
    //    1、ContainerId
    //    2、分配给容器的资源
    //    3、容器分配给的用户
    //    4、如果启用了安全性,还需要安全令牌
    //    5、我们上面设置的本地资源
    //    6、可选的、特定于应用程序的二进制服务数据
    //    7、已启动进程的环境变量
    //    8、启动容器的命令(里面包含了AM和Task所在容器的启动类,即结论中的MRAppMaster和YarnChild)
    //    9、容器失败退出时的重试策略
    //***********************************************
    //setupAMCommand方法会设置AM所在容器的启动命令参数,下面我们会展开看看命令是什么样的
    List<String> vargs = setupAMCommand(jobConf);
    ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
        jobConf, localResources, securityTokens, vargs);

    //设置RM用于续订令牌的配置
    String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
    if (regex != null && !regex.isEmpty()) {
      setTokenRenewerConf(amContainer, conf, regex);
    }


    Collection<String> tagsFromConf =
        jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);

    //设置ApplicationSubmissionContext
    //ApplicationSubmissionContext表示ResourceManager为应用程序启动ApplicationMaster所需的所有信息,包括:
    //    1、ApplicationId
    //    2、Application用户
    //    3、Application名称
    //    4、Application属性
    //    5、执行ApplicationMaster的容器的ContainerLaunchContext,上面我们已经构建了
    //    6、maxAppAttempts。应用程序尝试的最大次数。它应该不大于YARN配置中最大尝试的全局次数。
    //    7、尝试失败有效性间隔。默认值为-1。当以毫秒为单位的attemptFailuresValidationInterval设置为>0时,故障数将不会将发生在validityInterval之外的故障计入故障计数。如果失败计数达到maxAppAttempts,则应用程序将失败。
    //    8、可选,特定于应用程序的LogAggregationContext(LogAggregationContext表示NodeManager处理应用程序日志所需的所有信息。)
    ApplicationSubmissionContext appContext =
        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
    appContext.setApplicationId(applicationId);                // ApplicationId
    appContext.setQueue(                                       // Queue name
        jobConf.get(JobContext.QUEUE_NAME,
        YarnConfiguration.DEFAULT_QUEUE_NAME));
    // add reservationID if present
    ReservationId reservationID = null;
    try {
      reservationID =
          ReservationId.parseReservationId(jobConf
              .get(JobContext.RESERVATION_ID));
    } catch (NumberFormatException e) {
      // throw exception as reservationid as is invalid
      String errMsg =
          "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
              + " specified for the app: " + applicationId;
      LOG.warn(errMsg);
      throw new IOException(errMsg);
    }
    if (reservationID != null) {
      appContext.setReservationID(reservationID);
      LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
          + " to queue:" + appContext.getQueue() + " with reservationId:"
          + appContext.getReservationID());
    }
    appContext.setApplicationName(                             // Job name
        jobConf.get(JobContext.JOB_NAME,
        YarnConfiguration.DEFAULT_APPLICATION_NAME));
    appContext.setCancelTokensWhenComplete(
        conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
    appContext.setAMContainerSpec(amContainer);         // AM Container
    appContext.setMaxAppAttempts(
        conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
            MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));

    // Setup the AM ResourceRequests
    List<ResourceRequest> amResourceRequests = generateResourceRequests();
    appContext.setAMContainerResourceRequests(amResourceRequests);

    // set labels for the AM container requests if present
    String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
    if (null != amNodelabelExpression
        && amNodelabelExpression.trim().length() != 0) {
      for (ResourceRequest amResourceRequest : amResourceRequests) {
        amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
      }
    }
    // set labels for the Job containers
    appContext.setNodeLabelExpression(jobConf
        .get(JobContext.JOB_NODE_LABEL_EXP));

    appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
    if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
      appContext.setApplicationTags(new HashSet<>(tagsFromConf));
    }

    String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
    if (jobPriority != null) {
      int iPriority;
      try {
        iPriority = TypeConverter.toYarnApplicationPriority(jobPriority);
      } catch (IllegalArgumentException e) {
        iPriority = Integer.parseInt(jobPriority);
      }
      appContext.setPriority(Priority.newInstance(iPriority));
    }

    return appContext;
  }

1.2、setupAMCommand

private List<String> setupAMCommand(Configuration jobConf) {
    //命令参数长度为固定的8个
    List<String> vargs = new ArrayList<>(8);
    //$JAVA_HOME/bin/java 即java在本地的路径
    vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
        + "/bin/java");

    Path amTmpDir =
        new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
            YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
    //-Djava.io.tmpdir=容器的临时目录
    vargs.add("-Djava.io.tmpdir=" + amTmpDir);
    MRApps.addLog4jSystemProperties(null, vargs, conf);

    //检查MAP和REDUCE配置中的Java Lib路径使用情况
    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
        "map",
        MRJobConfig.MAP_JAVA_OPTS,
        MRJobConfig.MAP_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
        "map",
        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
        "reduce",
        MRJobConfig.REDUCE_JAVA_OPTS,
        MRJobConfig.REDUCE_ENV);
    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
        "reduce",
        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
        MRJobConfig.MAPRED_ADMIN_USER_ENV);

    //在用户命令选择之前添加AM管理命令选项,以便用户可以覆盖它
    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
    //默认是-Xmx1024m ,用户可以通过yarn.app.mapreduce.am.admin-command-opts设置
    vargs.add(mrAppMasterAdminOptions);

    //添加AM用户命令选项
    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
    默认是-Xmx1024m ,用户可以通过yarn.app.mapreduce.am.command-opts设置
    vargs.add(mrAppMasterUserOptions);

    //默认false,可以通过yarn.app.mapreduce.am.profile设置
    if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
        MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
      final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
          MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
      if (profileParams != null) {
        //默认是-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=<LOG_DIR>/profile.out
        vargs.add(String.format(profileParams,
            ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
                + TaskLog.LogName.PROFILE));
      }
    }

    //这里就是设置的启动类org.apache.hadoop.mapreduce.v2.app.MRAppMaster
    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
    //1><LOG_DIR>/stdout
    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDOUT);
    //2><LOG_DIR>/stderr
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDERR);
    return vargs;
  }

最终命令参数如下:

1、$JAVA_HOME/bin/java 
2、org.apache.hadoop.mapreduce.v2.app.MRAppMaster 
3、-Djava.io.tmpdir=容器的临时目录 
4、-Xmx1024m 
5、-Xmx1024m 
-6、agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=<LOG_DIR>/profile.out 
7、1><LOG_DIR>/stdout 
8、2><LOG_DIR>/stderr

1.3、submitJob

上面已经构建好了ApplicationSubmissionContext,下面可以提交了

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
   
    //......省略......

    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    //向ResourceManager提交ApplicationSubmissionContext 
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

    //......省略......

  }

2、YarnClient

resMgrDelegate是YarnClient的子类,最终也是通过调用YarnClient.submitApplication()提交到Yarn

该方法注释如下:

向YARN提交新申请,这是一个阻塞调用-在提交的应用程序成功提交并被ResourceManager接受之前,它不会返回ApplicationId。

用户在提交新应用程序时应提供ApplicationId作为参数ApplicationSubmissionContext的一部分,否则将引发ApplicationIdNotProvideredException。

这在内部调用ApplicationClientProtocol.submitApplication(SubmitApplicationRequest),之后,它在内部调用ApplicationClientProtocol.getApplicationReport(GetApplicationReportRequest)并等待,直到它可以确保应用程序正确提交为止。如果在ResourceManager保存应用程序的状态之前RM发生故障转移或RM重新启动,ApplicationClientProtocol.getApplicationReport(GetApplicationReportRequest)将抛出ApplicationNotFoundException。当此API捕获到ApplicationNotFoundException时,它会自动重新提交具有相同ApplicationSubmissionContext的应用程序。

YarnClient.submitApplication() 由子类实现,既:YarnClientImpl.submitApplication()

YarnClientImpl

public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    //构建SubmitApplicationRequest
    //客户端发送的请求向ResourceManager提交应用程序
    //该请求通过ApplicationSubmissionContext包含队列、运行ApplicationMaster所需的资源等详细信息,相当于启动ApplicationMaster的ContainerLaunchContext等
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    //自动将时间线DT添加到CLC中仅当安全和时间线服务都启用时
    if (isSecurityEnabled() && timelineV1ServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    //在submitApplication调用期间处理RM故障切换。
    //这里会调用ApplicationClientProtocol.submitApplication()
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,
                                 YarnApplicationState.NEW_SAVING,
                                 YarnApplicationState.SUBMITTED);
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,
                                  YarnApplicationState.KILLED);		
    while (true) {
      try {
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        //一直监控应用状态,直到状态不再是NEW、NEW_SAVING、SUBMITTED中的一个
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;
        }

        //......省略......
    }

    return applicationId;
  }

3、ApplicationClientProtocol

ApplicationClientProtocol是客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序、集群指标、节点、队列和ACL的信息。

submitApplication()注释如下:

客户端用于向ResourceManager提交新应用程序的接口

客户端需要通过SubmitApplicationRequest提供运行ApplicationMaster所需的队列、资源等详细信息,相当于ContainerLaunchContext,用于启动ApplicationMaster等

当前,ResourceManager在接受提交时立即(空)发送SubmitApplicationResponse,如果拒绝提交则抛出异常。但是,此调用之后需要执行getApplicationReport(GetApplicationReportRequest)以确保正确提交应用程序-从ResourceManager获得SubmitApplicationResponse并不保证RM在故障转移或重新启动之后“记住”此应用程序。如果在ResourceManager成功保存应用程序的状态之前发生RM故障切换或RM重新启动,则随后的getApplicationReport(GetApplicationReportReques)将抛出ApplicationNotFoundException。当客户端在(getApplicationReport(GetApplicationReportRequest)调用中遇到ApplicationNotFoundException时,客户端需要使用相同的ApplicationSubmissionContext重新提交应用程序。

在提交过程中,它会检查应用程序是否已经存在。如果应用程序存在,它将简单地返回SubmitApplicationResponse

在安全模式下,ResourceManager在接受应用程序提交之前验证对队列等的访问权限。

该方法最终会调用其子类ClientRMService.submitApplication()

4、ClientRMService

该类是资源管理器的客户端接口。该模块处理从客户端到资源管理器的所有rpc接口。

public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException, IOException {
    
    //......省略......
    
    //检查应用程序是否已经放入rmContext,如果是,只需返回响应
    if (rmContext.getRMApps().get(applicationId) != null) {
      LOG.info("This is an earlier submitted application: " + applicationId);
      return SubmitApplicationResponse.newInstance();
    }

    //......省略......

      //调用RMAppManager直接提交申请,我们接着看rmAppManager中的实现
      rmAppManager.submitApplication(submissionContext,
          System.currentTimeMillis(), userUgi);

    //......省略......

    return recordFactory
        .newRecordInstance(SubmitApplicationResponse.class);
  }

5、RMAppManager

该类管理资源管理器的应用程序列表。

protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      UserGroupInformation userUgi) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

    //将开始时间传递为-1。它最终将在RMAppImpl构造函数中设置
    RMAppImpl application = createAndPopulateNewRMApp(
        submissionContext, submitTime, userUgi, false, -1, null);
    try {
      //确定UserGroupInformation是使用Kerberos来确定用户身份,还是依赖于简单身份验证
      if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer()
            .addApplicationAsync(applicationId,
                BuilderUtils.parseCredentials(submissionContext),
                submissionContext.getCancelTokensWhenComplete(),
                application.getUser(),
                BuilderUtils.parseTokensConf(submissionContext));
      } else {
        //调度程序此时尚未启动,因此应确保在调度程序启动时首先处理这些排队的START事件。
        //下面我们看下这个事件(RMAppEventType.START)是怎么处理的
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.START));
      }
    } catch (Exception e) {
      LOG.warn("Unable to parse credentials for " + applicationId, e);
      // 发送APP_REJECTED是可以的,因为我们假设RMApp处于NEW状态,因此我们还没有通知调度器应用程序的存在
      this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppEvent(applicationId,
              RMAppEventType.APP_REJECTED, e.getMessage()));
      throw RPCUtil.getRemoteException(e);
    }
  }

RMAppEventType.START 是由RMAppImpl处理

6、RMAppImpl

private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    
    //......以上事件省略......

    //我们重点看这个事件的处理,既:RMAppNewlySavingTransition
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    

    //......以下事件省略......

    // Transitions from NEW_SAVING state
    
    //第9步会用到
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
    

     // Transitions from SUBMITTED state
    
    //第10步会用到
    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())

     // Transitions from ACCEPTED state
    

     // Transitions from RUNNING state
    

     // Transitions from FINAL_SAVING state
    

     // Transitions from FINISHING state
    

     // Transitions from KILLING state
    

     // Transitions from FINISHED state
     // ignorable transitions
   

     // Transitions from FAILED state
     // ignorable transitions
    

     // Transitions from KILLED state
     // ignorable transitions
    

     .installTopology();

7、RMAppNewlySavingTransition

private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      long applicationLifetime =
          app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
      applicationLifetime = app.scheduler
          .checkAndGetApplicationLifetime(app.queue, applicationLifetime);
      //根据配置的队列生存期,验证提交的应用程序生存期是否有效。
      if (applicationLifetime > 0) {
        // calculate next timeout value
        Long newTimeout =
            Long.valueOf(app.submitTime + (applicationLifetime * 1000));
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            ApplicationTimeoutType.LIFETIME, newTimeout);

        //使用新的绝对值更新applicationTimeouts
        app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME,
            newTimeout);

        LOG.info("Application " + app.applicationId
            + " is registered for timeout monitor, type="
            + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
            + " seconds");
      }

      //如果启用了恢复,则将应用程序信息存储在非阻塞调用中,以确保RM已存储了在RM重新启动后重新启动AM所需的信息,而无需进一步的客户端通信
      //这是一个新任务,我们看条线
      //非阻塞API资源管理器服务使用此来存储应用程序的状态此不阻塞调度程序线程RMAppStoredEvent将在完成时发送以通知RMApp
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

8、RMStateStore

public void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(app.getSubmitTime(),
            app.getStartTime(), context, app.getUser(), app.getRealUser(),
            app.getCallerContext());
    appState.setApplicationTimeouts(app.getApplicationTimeouts());
    //这里会设置一个状态:RMStateStoreEventType.STORE_APP,接下来我们看看这个事件怎么处理
    getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
private static final StateMachineFactory<RMStateStore,
                                           RMStateStoreState,
                                           RMStateStoreEventType, 
                                           RMStateStoreEvent>
      stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.ACTIVE)
      .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
      
      //......省略......

    );
private static class StoreAppTransition
      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {

      //......省略......
        store.storeApplicationStateInternal(appId, appState);
        //接下来我们看看这个事件的处理RMAppEventType.APP_NEW_SAVED)
        store.notifyApplication(
            new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
      //......省略......

      return finalState(isFenced);
    };

  }

9、再回RMAppImpl

第6步已经有关相关代码,我们就不贴了,处理RMAppEventType.APP_NEW_SAVED事件调用的是new AddApplicationToSchedulerTransition()

private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      //里面new了一个新事件(SchedulerEventType.APP_ADDED)
      app.handler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
              app.applicationPriority, app.placementContext));
      // 发送ATS创建事件
      app.sendATSCreateEvent();
    }
  }

SchedulerEventType.APP_ADDED事件会被调度器处理(FifoScheduler、CapacityScheduler、FairScheduler)

我们先看FairScheduler中的实现

10、FairScheduler

public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    case NODE_ADDED:
      
    case NODE_REMOVED:
      
    case NODE_UPDATE:
      
    //......省略以上......

    case APP_ADDED:
      if (!(event instanceof AppAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      String queueName =
          resolveReservationQueueName(appAddedEvent.getQueue(),
              appAddedEvent.getApplicationId(),
              appAddedEvent.getReservationID(),
              appAddedEvent.getIsAppRecovering());
      if (queueName != null) {
        //向调度程序中添加一个新的应用程序,该应用程序具有给定的id、队列名称和用户。
        //即使用户或队列超过配置的限制,这也会接受新的应用程序,但该应用程序不会被标记为可运行。会生成一个RMAppEventType.APP_ACCEPTED事件,接下来我们看看这个事件的处理
        addApplication(appAddedEvent.getApplicationId(),
            queueName, appAddedEvent.getUser(),
            appAddedEvent.getIsAppRecovering());
      }
      break;

    //......省略以下......

    case APP_REMOVED:
      
    case NODE_RESOURCE_UPDATE:
      
    case APP_ATTEMPT_ADDED:
    //第12步会用到
    if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
          (AppAttemptAddedSchedulerEvent) event;
      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
        appAttemptAddedEvent.getIsAttemptRecovering());
      break;
      
    case APP_ATTEMPT_REMOVED:
      
    case RELEASE_CONTAINER:
      
    case CONTAINER_EXPIRED:
      
    default:
      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
    }
  }

RMAppEventType.APP_ACCEPTE 的处理还是在RMAppImpl

11、再回RMAppImpl

第6步已经有关相关代码,我们就不贴了,处理RMAppEventType.APP_ACCEPTE 事件调用的是new StartAppAttemptTransition()

private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

    |
    |
   \ /

private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    //创建一个新的Attempt
    createNewAttempt();
    //这里会生成一个新事件RMAppAttemptEventType.START
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }

RMAppAttemptEventType.START事件由RMAppAttemptImpl进行处理

12、RMAppAttemptImpl

private static final StateMachineFactory<RMAppAttemptImpl,
                                           RMAppAttemptState,
                                           RMAppAttemptEventType,
                                           RMAppAttemptEvent>
       stateMachineFactory  = new StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
                                     RMAppAttemptEvent>(RMAppAttemptState.NEW)

       // Transitions from NEW State
      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())
      
      //......省略以下......
          
      // Transitions from SUBMITTED state
      
          
       // Transitions from SCHEDULED State
       //第13步会用到
       .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())
      
       // Transitions from SUBMITTED state
       //第16步会用到
       .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())

       // Transitions from ALLOCATED_SAVING State
       //第16步会用到
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
          RMAppAttemptState.ALLOCATED,
          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
      

       // Transitions from LAUNCHED_UNMANAGED_SAVING State
     

       // Transitions from ALLOCATED State
     

       // Transitions from LAUNCHED State
      

       // Transitions from RUNNING State
      

       // Transitions from FINAL_SAVING State
      

      // Transitions from FAILED State
      

      // Transitions from FINISHING State
      

      // Transitions from FINISHED State
      

      // Transitions from KILLED State
      
    .installTopology();

RMAppAttemptEventType.START事件由AttemptStartedTransition()处理

private static final class AttemptStartedTransition extends BaseTransition {
	@Override
    public void transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      
      //......省略......

      //将应用程序尝试添加到计划程序,并通知计划程序是否从上次尝试转移状态。
      //生成新的事件SchedulerEventType.APP_ATTEMPT_ADDED
      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
    }
  }

13、再回FairScheduler

第10步FairScheduler中有对SchedulerEventType.APP_ATTEMPT_ADDED事件的处理,这里就不贴代码了

protected void addApplicationAttempt(
      ApplicationAttemptId applicationAttemptId,
      boolean transferStateFromPreviousAttempt,
      boolean isAttemptRecovering) {
    writeLock.lock();
    
        //生成一个新的事件RMAppAttemptEventType.ATTEMPT_ADDED
        rmContext.getDispatcher().getEventHandler().handle(
            new RMAppAttemptEvent(applicationAttemptId,
                RMAppAttemptEventType.ATTEMPT_ADDED));
        
  }

14、再回RMAppAttemptImpl

第12步已经有关相关代码,我们就不贴了,处理RMAppAttemptEventType.ATTEMPT_ADDED

事件调用的是new ScheduleTransition()

 public static final class ScheduleTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {

       //......省略......

        //提交时已检查AM资源
        //这里会通过YarnScheduler申请容器
        Allocation amContainerAllocation =
            appAttempt.scheduler.allocate(
                appAttempt.applicationAttemptId,
                appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST,
                amBlacklist.getBlacklistAdditions(),
                amBlacklist.getBlacklistRemovals(),
                new ContainerUpdates());
        if (amContainerAllocation != null
            && amContainerAllocation.getContainers() != null) {
          assert (amContainerAllocation.getContainers().size() == 0);
        }
        appAttempt.scheduledTime = System.currentTimeMillis();
        //现在状态是RMAppAttemptState.SCHEDULED
        return RMAppAttemptState.SCHEDULED;

      //......省略......

    }
  }

YarnScheduler.allocate(),最终会调用三种调度器中的方法进行实现,我们还是先只看FairScheduler

15、再回FairScheduler

public Allocation allocate(ApplicationAttemptId appAttemptId,
      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
      List<ContainerId> release, List<String> blacklistAdditions,
      List<String> blacklistRemovals, ContainerUpdates updateRequests) {
    //请确保此应用程序存在
    FSAppAttempt application = getSchedulerApp(appAttemptId);
    if (application == null) {
      LOG.error("Calling allocate on removed or non existent application " +
          appAttemptId.getApplicationId());
      return EMPTY_ALLOCATION;
    }

    //分配可能是上一次尝试的剩余部分,它将影响当前尝试,例如混淆当前尝试的AM容器的请求和分配。
    //请注意,尝试id的外部先决条件检查在这里可能已经过时,因此有必要在这里进行双重检查。
    if (!application.getApplicationAttemptId().equals(appAttemptId)) {
      LOG.error("Calling allocate on previous or removed " +
          "or non existent application attempt " + appAttemptId);
      return EMPTY_ALLOCATION;
    }

    ApplicationId applicationId = application.getApplicationId();
    FSLeafQueue queue = application.getQueue();
    List<MaxResourceValidationResult> invalidAsks =
            validateResourceRequests(ask, queue);

    //如果检测到任何无效的请求,我们需要在这里快速失败。如果我们稍后抛出异常,这可能会有问题,        
    //因为令牌和升级/降级的容器会丢失,因为调度程序会立即清除它们,AM不会获得这些信息。
    if (!invalidAsks.isEmpty()) {
      throw new SchedulerInvalidResoureRequestException(String.format(
              "Resource request is invalid for application %s because queue %s "
                      + "has 0 amount of resource for a resource type! "
                      + "Validation result: %s",
              applicationId, queue.getName(), invalidAsks));
    }

    //处理晋升和降级
    handleContainerUpdates(application, updateRequests);

    //健全性检查
    normalizeResourceRequests(ask, queue.getName());

    // TODO, 正常计划请求

    //记录容器分配开始时间
    application.recordContainerRequestTime(getClock().getTime());

    //释放容器
    releaseContainers(release, application);

    ReentrantReadWriteLock.WriteLock lock = application.getWriteLock();
    lock.lock();
    try {
      if (!ask.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "allocate: pre-update" + " applicationAttemptId=" + appAttemptId
                  + " application=" + application.getApplicationId());
        }
        application.showRequests();

        //更新应用程序请求
        application.updateResourceRequests(ask);

        // TODO, 处理SchedulingRequest
        application.showRequests();
      }
    } finally {
      lock.unlock();
    }

    Set<ContainerId> preemptionContainerIds =
        application.getPreemptionContainerIds();
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "allocate: post-update" + " applicationAttemptId=" + appAttemptId
              + " #ask=" + ask.size() + " reservation= " + application
              .getCurrentReservation());

      LOG.debug("Preempting " + preemptionContainerIds.size()
          + " container(s)");
    }

    application.updateBlacklist(blacklistAdditions, blacklistRemovals);

    List<Container> newlyAllocatedContainers =
        application.pullNewlyAllocatedContainers();
    //记录容器分配时间
    if (!(newlyAllocatedContainers.isEmpty())) {
      application.recordContainerAllocationTime(getClock().getTime());
    }

    //净空取决于集群中的资源、队列的当前使用情况、队列的公平共享和队列的最大资源。
    Resource headroom = application.getHeadroom();
    application.setApplicationHeadroomForMetrics(headroom);

    //当AM心跳时调用。AM注册后,RM回收了这些集装箱
    //它们在AllocateResponse.containersFromPreviousAttempts()中报告给AM
    List<Container> previousAttemptContainers = application
        .pullPreviousAttemptContainers();
    //NMToken用于验证与NodeManager的通信
    //ApplicationMaster与ResourceManager协商资源时,ResourceMananger发出,NodeManager端验证
    List<NMToken> updatedNMTokens = application.pullUpdatedNMTokens();
    //制作分配单元
    return new Allocation(newlyAllocatedContainers, headroom,
        preemptionContainerIds, null, null,
        updatedNMTokens, null, null,
        application.pullNewlyPromotedContainers(),
        application.pullNewlyDemotedContainers(),
        previousAttemptContainers);
  }

分配单元构建成功后,我们看下RMAppAttemptState.SCHEDULED事件的处理,

16、再回RMAppAttemptImpl

第12步已经有关相关代码,我们就不贴了,处理RMAppAttemptState.SCHEDULED

事件调用的是new AMContainerAllocatedTransition()

private static final class AMContainerAllocatedTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      //从调度程序获取AM容器。
      Allocation amContainerAllocation =
          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
            EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null,
            null, new ContainerUpdates());
      // 必须至少分配一个容器,因为container_allocated是在构造RMContainer并将其放入SchedulerApplication.newlyAllocatedContainers()之后发出的。

      //请注意,YarnScheduler.allocate()不能保证能够提取它,因为容器可能由于某些原因而无法提取,例如DNS不可用导致无法生成容器令牌。因此,我们返回到以前的状态并保持重试,直到提取到容器为止
      if (amContainerAllocation.getContainers().size() == 0) {
        appAttempt.retryFetchingAMContainer(appAttempt);
        return RMAppAttemptState.SCHEDULED;
      }

      //设置主容器
      Container amContainer = amContainerAllocation.getContainers().get(0);
      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
          .getRMContainer(amContainer.getId());
      //当删除一个NM时,调度器将清理容器,下面的container_FINISHED事件将处理已清理的容器。所以只需返回RMAppAttemptState。已排定
      if (rmMasterContainer == null) {
        return RMAppAttemptState.SCHEDULED;
      }
      //为尝试应用分配AM容器
      appAttempt.setMasterContainer(amContainer);
      rmMasterContainer.setAMContainer(true);
      //NMTokenSecrentManager中设置的节点用于标记此节点的NMToken是否已颁发给AM。
      //当AM容器分配给RM本身时,分配此AM容器的节点被标记为已发送的NMToken。
      //因此,清除此节点集,以便来自AM的以下分配请求能够检索相应的NMToken。
      appAttempt.rmContext.getNMTokenSecretManager()
        .clearNodeSetForAttempt(appAttempt.applicationAttemptId);
      appAttempt.getSubmissionContext().setResource(
        appAttempt.getMasterContainer().getResource());
      appAttempt.containerAllocatedTime = System.currentTimeMillis();
      long allocationDelay =
          appAttempt.containerAllocatedTime - appAttempt.scheduledTime;
      ClusterMetrics.getMetrics().addAMContainerAllocationDelay(
          allocationDelay);
      appAttempt.storeAttempt();
      //此时处理这个事件
      return RMAppAttemptState.ALLOCATED_SAVING;
    }
  }

第12步已经有关相关代码,处理RMAppAttemptState.ALLOCATED_SAVING

事件调用的是new AttemptStoredTransition()

private static final class AttemptStoredTransition extends BaseTransition {
    @Override
    public void transition(RMAppAttemptImpl appAttempt,
                                                    RMAppAttemptEvent event) {
      //将ClientTokenMasterKey保存到存储中后进行注册,否则RM重新启动后客户端可能会持有无效的ClientToken。
      appAttempt.registerClientToken();
      
      appAttempt.launchAttempt();
    }
  }

private void launchAttempt(){
    launchAMStartTime = System.currentTimeMillis();
    //发送事件以启动AM容器AMLauncherEventType.LAUNCH
    eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
  }

五、流程总结

1、构建启动MR AM所需的所有信息(LocalResource、ContainerLaunchContext、启动命令
、ApplicationSubmissionContext等)

2、构建SubmitApplicationRequest(客户端需要通过SubmitApplicationRequest提供运行ApplicationMaster所需的队列、资源等详细信息,相当于ContainerLaunchContext,用于启动ApplicationMaster等)

3、YarnClient通过ApplicationClientProtocol提交SubmitApplicationRequest (ApplicationClientProtocol是客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序、集群指标、节点、队列和ACL的信息。)

4、转到ClientRMService提交

5、转到RMAppManager提交

6、创建RMAppEventType.START事件并处理

7、创建RMStateStoreEventType.STORE_APP事件并处理

8、创建RMAppEventType.APP_NEW_SAVED事件并被调度器处理

9、创建RMAppEventType.APP_ACCEPTE事件并处理

10、创建RMAppAttemptEventType.START事件并处理

11、创建SchedulerEventType.APP_ATTEMPT_ADDED事件并被调度器处理

12、创建RMAppAttemptState.SCHEDULED事件并处理

13、创建RMAppAttemptState.ALLOCATED_SAVING事件并处理

14、创建AMLauncherEventType.LAUNCH事件启动ApplicationMaster容器

15、使用命令启动MRAppMaster

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1413600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Bitbucket第一次代码仓库创建/提交/创建新分支/合并分支/忽略ignore

1. 首先要在bitbucket上创建一个项目&#xff0c;这个我没有权限创建&#xff0c;是找的管理员创建的。 管理员创建之后&#xff0c;这个项目给了我权限&#xff0c;我就可以创建我的代码仓库了。 2. 点击这个Projects下的具体项目名字&#xff0c;就会进入这样一个页面&#…

EG-2121CA (晶体振荡器 低抖动表面声波(SAW)振荡器)

在当今高度数字化的时代&#xff0c;稳定的信号传输显得尤为重要。若要实现信号的稳定传输&#xff0c;晶体振荡器必不可少。EG-2121CA&#xff0c;它是一款低抖动表面声波&#xff08;SAW&#xff09;振荡器设计的产品&#xff0c;凭借其出色的频率范围、稳定的电源电压和可靠…

网络安全全栈培训笔记(58-服务攻防-应用协议设备KibanaZabbix远控向日葵VNCTV)

第58天 服务攻防-应用协议&设备Kibana&Zabbix&远控向日葵&VNC&TV 知识点&#xff1a; 1、远程控制第三方应用安全 2、三方应用-向日葵&VNC&TV 3、设备平台-Zabbix&Kibanai漏洞 章节内容&#xff1a; 常见版务应用的安全测试&#xff1a; 1…

甲基四嗪-PEG4-叠氮,Methyltetrazine PEG4 azide,可以作为连接各种生物分子的桥梁

您好&#xff0c;欢迎来到新研之家 文章关键词&#xff1a;甲基四嗪-四聚乙二醇-叠氮&#xff0c;甲基四嗪-PEG4-叠氮&#xff0c;Methyltetrazine PEG4 azide &#xff0c;Methyltetrazine PEG4 N3 一、基本信息 产品简介&#xff1a;Methyltetrazine PEG4 azide is a comp…

简单记录一下如何安装python以及pycharm(图文教程)(可供福建专升本理工类同学使用)

本教程主要给不懂计算机的或者刚刚开始学习python的同学&#xff08;福建专升本理工类&#xff09;&网友学习使用&#xff0c;基础操作&#xff0c;比较详细&#xff0c;其他问题等待补充&#xff01; 安装Python 1.进入python官网&#xff08;https://www.python.org/&a…

10.Golang中的map

目录 概述map实践map声明代码 map使用代码 结束 概述 map实践 map声明 代码 package mainimport ("fmt" )func main() {// 声明方式1var map1 map[string]stringif map1 nil {fmt.Println("map1为空")}// 没有分配空间&#xff0c;是不能使用的// map…

关于在微信小程序中使用taro + react-hook后销毁函数无法执行的问题

问题&#xff1a; 在 taro中使用navigageTo() 跳转路由后hook中useEffect 的return函数没有执行 没有执行return函数 框架版本&#xff1a; tarojs: 3.6 react: 18.0 原因&#xff1a; 使用navigateTo() 跳转路由的话并不会销毁页面和组件&#xff0c;会加入一…

携程开源 基于真实请求与数据的流量回放测试平台、自动化接口测试平台AREX

携程开源 基于真实请求与数据的流量回放测试平台、自动化接口测试平台AREX 官网文档 基于真实请求与数据的流量回放测试平台、自动化接口测试平台AREX 这篇文章稍稍水一下&#xff0c;主要讲下部署过程里踩的坑&#xff0c;因为部署的过程主要是运维同学去处理了&#xff0c;我…

【Java与网络3】Java网络编程之初体验

我们平时极少使用Java来直接写网络通信相关的程序&#xff0c;一般都使用Tomcat Web服务或者Netty等框架来帮助我们做&#xff0c;不过呢&#xff0c;要想将技术学到家&#xff0c;我们研究一下基本的网络编程还是非常必要的&#xff0c;这样可以让我们将很多内容融会贯通&…

把批量M3U8网络视频地址转为MP4视频

在数字媒体时代&#xff0c;视频格式的转换已成为一项常见的需求。尤其对于那些经常处理网络视频的用户来说&#xff0c;将M3U8格式的视频转换为更常见的MP4格式是一项必备技能。幸运的是&#xff0c;现在有了固乔剪辑助手这款强大的工具&#xff0c;这一过程变得异常简单。下面…

单片机学习笔记---矩阵键盘

目录 矩阵键盘的介绍 独立按键和矩阵按键的相同之处&#xff1a; 矩阵按键的扫描 代码演示 代码模块化移植 Keil自定义模板步骤&#xff1a; 代码编写 矩阵键盘就是开发板上右下角的这个模块 这一节的代码是基于上一节讲的LCD1602液晶显示屏驱动代码进行的 矩阵键盘的介…

大数据学习之Flink算子、了解(Source)源算子(基础篇二)

Source源算子&#xff08;基础篇二&#xff09; 目录 Source源算子&#xff08;基础篇二&#xff09; 二、源算子&#xff08;source&#xff09; 1. 准备工作 2.从集合中读取数据 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方…

用户密码网络传输、保存方案分析

大华 1、大华19年的IPC&#xff0c;登录认证接口有两次&#xff0c;第一次请求算法所需数据&#xff0c;第二次传输摘要值&#xff0c;看样子是私有算法。 2、添加用户传输用户密码等敏感数据时&#xff0c;使用"RPAC-256"算法&#xff0c;应该是大华内部的私有算法。…

【从零到一】跑通CATR(二):在并行超算云上使用Cifar-10进行测试

从零到一配环境篇 由于今年要展开大量的编程工作&#xff0c;实验室在用的云计算平台是并行超算云&#xff0c;因此打算在寒假期间先熟悉一下超算云的环境&#xff0c;并从配套的文档和网上的教程开始&#xff0c;从零到一先跑通一个用于音视频分割的模型CATR。 以blog的形式…

docker-compose部署单机ES+Kibana

记录部署的操作步骤 准备工作编写docker-compose.yml启动服务验证部署结果 本次elasticsearch和kibana版本为8.2.2 使用环境&#xff1a;centos7.9 本次记录还包括&#xff1a;安装elasticsearch中文分词插件和拼音分词插件 准备工作 1、创建目录和填写配置 mkdir /home/es/s…

Vue3中的ref和shallowRef、reactive和shallowReactive

一&#xff1a;ref、reactive简介 ref和reactive是Vue3中定义响应式数据的一种方式。ref通常用来定义基础类型数据。reactive通常用来定义复杂类型数据。 二、shallowRef、shallowReactive简介 shallowRef和shallowReactive是Vue3中定义浅层次响应式数据的方式 三、Api使用对比…

【寒假每日一题·2024】AcWing 5307. 小苹果(补)

文章目录 一、题目1、原题链接2、题目描述 二、解题报告1、思路分析2、时间复杂度3、代码详解 三、知识风暴 一、题目 1、原题链接 5307. 小苹果 2、题目描述 二、解题报告 1、思路分析 思路参考y总&#xff1a;y总讲解视频 &#xff08;1&#xff09;根据题目可以分析出&…

中小型企业机房设计部署方案

我对接参与过至少十几个分公司和总部的机房设计&#xff0c;结合十几年实际工作管理经验&#xff0c;归纳设计了以下这个机房方案&#xff0c;这个机房最大化利用了空间的同时&#xff0c;最大化设计了各方面的冗余。 机房包含了UPS隔离&#xff0c;噪音隔离&#xff0c;功率冗…

SpringBoot之分页查询的使用

背景 在业务中我们在前端总是需要展示数据&#xff0c;将后端得到的数据进行分页处理&#xff0c;通过pagehelper实现动态的分页查询&#xff0c;将查询页数和分页数通过前端发送到后端&#xff0c;后端使用pagehelper&#xff0c;底层是封装threadlocal得到页数和分页数并动态…

2. MySQL 多实例

重点&#xff1a; MySQL 的 三种安装方式&#xff1a;包安装&#xff0c;二进制安装&#xff0c;源码编译安装。 MySQL 的 基本使用 MySQL 多实例 DDLcreate alter drop DML insert update delete DQL select 2.5&#xff09;通用 二进制格式安装 MySQL 2.5.1&#xff…