流程
第一步:上传jar 、配置文件、split 文件
提交任务的文件到hdfs 路径:本地配置获取: yarn.app.mapreduce.am.staging-dir (默认:/tmp/hadoop-yarn/staging)
/tmp/hadoop-yarn/staging/$user/.staging
/tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.split
/tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.splitmetainfo
/tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.xml
第二步:构建ApplicationSubmissionContext
主要是am启动资源的准备:
- 需要下载到本地资源,第一步上传的文件split、job.xml、任务运行的jar
- token 信息
- 环境变量classpath
CLASSPATH=%PWD%
...
%HADOOP_CONF_DIR%
%HADOOP_COMMON_HOME%/share/hadoop/common/*
%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\lib\*
job.jar/*
job.jar/classes/
job.jar/lib/*
%PWD%/*
可以设置 mapreduce.job.user.classpath.first = true ,在生成classpath ,会把计算框架的jar ,拼接在后面。
%HADOOP_CONF_DIR%
%HADOOP_COMMON_HOME%/share/hadoop/common/*
%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\lib*
MRApps.java
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
boolean userClassesTakesPrecedence =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
String classpathEnvVar =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
MRApps.addToEnvironment(environment,
classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf); // 拼接计算框架的环境变量的jar
}
/*
* We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for
* the case where the job jar is not necessarily named "job.jar". This can
* happen, for example, when the job is leveraging a resource from the YARN
* shared cache.
*/
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
// a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else
addToClasspathIfNotJar(JobContextImpl.getFileClassPaths(conf),
JobContextImpl.getCacheFiles(conf),
conf,
environment, classpathEnvVar);
addToClasspathIfNotJar(JobContextImpl.getArchiveClassPaths(conf),
JobContextImpl.getCacheArchives(conf),
conf,
environment, classpathEnvVar);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
- 启动AM命令
%JAVA_HOME%/bin/java -Djava.io.tmpdir=%PWD%/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=<LOG_DIR> -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=info,CLA -Dhadoop.root.logfile=syslog -Dfile.encoding=UTF-8 org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr
- AM 需要的启动资源
{AllocationRequestId: -1, Priority: 0, Capability: <memory:1536, vCores:1>, # Containers: 1, Location: *, Relax Locality: true, Execution Type Request: null, Node Label Expression: null}
第三步、提交RM
远程调用 ClientRMService.submitApplication ,ClientRMService 默认8032 端口RPCServer ,client 端服务接口,实现ApplicationClientProtocol 协议。
ApplicationClientProtocol :
getNewApplication
submitApplication
failApplicationAttempt
forceKillApplication
getClusterMetrics
getClusterNodes
getQueueInfo
getQueueUserAcls
moveApplicationAcrossQueues
getNewReservation
submitReservation
updateReservation
deleteReservation
listReservations
getNodeToLabels
getLabelsToNodes
getClusterNodeLabels
updateApplicationPriority
signalToContainer
updateApplicationTimeouts
getResourceProfiles
getResourceProfile
getResourceTypeInfo
getAttributesToNodes
getClusterNodeAttributes
getNodesToAttributes